diff options
20 files changed, 978 insertions, 131 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 42f03acc7a..fa3bdf0934 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -568,7 +568,7 @@ public interface Session * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
* exchange types define the functionality of the exchange - i.e. how messages are routed
* through it. It is not valid or meaningful to attempt to change the type of an existing
- * exchange. Default exchange types are: direct, topic, headers and fanout.
+ * exchange. Default exchange types are: direct, topic, headers and fanout.
* @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
* the message will be sent.
* @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java index 6a786e3edb..2c8ce94e27 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java @@ -1,100 +1,418 @@ package org.apache.qpidity.jms; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.QueueConnection; -import javax.jms.QueueConnectionFactory; -import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; +import javax.jms.*; import javax.naming.NamingException; import javax.naming.Reference; import javax.naming.Referenceable; import javax.naming.StringRefAddr; import org.apache.qpidity.QpidException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class ConnectionFactoryImpl implements ConnectionFactory,QueueConnectionFactory, TopicConnectionFactory, Referenceable -{ +public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, + XATopicConnectionFactory, XAQueueConnectionFactory, XAConnectionFactory, + Referenceable +{ + /** + * this ConnectionFactoryImpl's logger + */ + private static final Logger _logger = LoggerFactory.getLogger(ConnectionFactoryImpl.class); + + /** + * The virtual host on which the broker is deployed. + */ private String _host; + /** + * The port on which the broker is listening for connection. + */ private int _port; + /** + * The default user name used of user identification. + */ private String _defaultUsername; + /** + * The default password used of user identification. + */ private String _defaultPassword; - private String _virtualPath; + /** + * The virtual host on which the broker is deployed. + */ + private String _virtualHost; + /** + * The URL used to build this factory, (not yet supported) + */ private String _url; - + // Undefined at the moment public ConnectionFactoryImpl(String url) { _url = url; } - - public ConnectionFactoryImpl(String host,int port,String virtualHost,String defaultUsername,String defaultPassword) + + /** + * Create a connection. + * + * @param host The broker host name. + * @param port The port on which the broker is listening for connection. + * @param virtualHost The virtual host on which the broker is deployed. + * @param defaultUsername The user name used of user identification. + * @param defaultPassword The password used of user identification. + */ + public ConnectionFactoryImpl(String host, int port, String virtualHost, String defaultUsername, + String defaultPassword) { _host = host; _port = port; _defaultUsername = defaultUsername; _defaultPassword = defaultPassword; - _virtualPath = virtualHost; + _virtualHost = virtualHost; } - + + //-- Interface ConnectionFactory + + /** + * Creates a connection with the default user identity. + * <p> The connection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created connection. + * @throws JMSException If creating the connection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ public Connection createConnection() throws JMSException - { + { try { - return new ConnectionImpl(_host,_port,_virtualPath,_defaultUsername,_defaultPassword); + return new ConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword); } - catch(QpidException e) + catch (QpidException e) { - // need to convert the qpid exception into jms exception - throw new JMSException("",""); + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } } + /** + * Creates a connection with the specified user identity. + * <p> The connection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created connection. + * @throws JMSException If creating the connection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ public Connection createConnection(String username, String password) throws JMSException { try { - return new ConnectionImpl(_host,_port,_virtualPath,username,password); + return new ConnectionImpl(_host, _port, _virtualHost, username, password); } - catch(QpidException e) + catch (QpidException e) { - // need to convert the qpid exception into jms exception - throw new JMSException("",""); + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); } } // ---------------------------------------- // Support for JMS 1.0 classes // ---------------------------------------- + //--- Interface QueueConnection + /** + * Creates a queueConnection with the default user identity. + * <p> The queueConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created queueConnection + * @throws JMSException If creating the queueConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ public QueueConnection createQueueConnection() throws JMSException { - return (QueueConnection) createConnection(); + try + { + return new QueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } + /** + * Creates a queueConnection with the specified user identity. + * <p> The queueConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created queueConnection. + * @throws JMSException If creating the queueConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ public QueueConnection createQueueConnection(String username, String password) throws JMSException { - return (QueueConnection) createConnection(username, password); + try + { + return new QueueConnectionImpl(_host, _port, _virtualHost, username, password); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } + //--- Interface TopicConnection + /** + * Creates a topicConnection with the default user identity. + * <p> The topicConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created topicConnection + * @throws JMSException If creating the topicConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ public TopicConnection createTopicConnection() throws JMSException { - return (TopicConnection) createConnection(); + try + { + return new TopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } + /** + * Creates a topicConnection with the specified user identity. + * <p> The topicConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created topicConnection. + * @throws JMSException If creating the topicConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ public TopicConnection createTopicConnection(String username, String password) throws JMSException { - return (TopicConnection) createConnection(username, password); + try + { + return new TopicConnectionImpl(_host, _port, _virtualHost, username, password); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + // --------------------------------------------------------------------------------------------------- + // the following methods are provided for XA compatibility + // --------------------------------------------------------------------------------------------------- + + /** + * Creates a XAConnection with the default user identity. + * <p> The XAConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created XAConnection + * @throws JMSException If creating the XAConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAConnection createXAConnection() throws JMSException + { + try + { + return new XAConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Creates a XAConnection with the specified user identity. + * <p> The XAConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created XAConnection. + * @throws JMSException If creating the XAConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAConnection createXAConnection(String username, String password) throws JMSException + { + try + { + return new XAConnectionImpl(_host, _port, _virtualHost, username, password); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + + /** + * Creates a XATopicConnection with the default user identity. + * <p> The XATopicConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created XATopicConnection + * @throws JMSException If creating the XATopicConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XATopicConnection createXATopicConnection() throws JMSException + { + try + { + return new XATopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } - - + + /** + * Creates a XATopicConnection with the specified user identity. + * <p> The XATopicConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created XATopicConnection. + * @throws JMSException If creating the XATopicConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XATopicConnection createXATopicConnection(String username, String password) throws JMSException + { + try + { + return new XATopicConnectionImpl(_host, _port, _virtualHost, username, password); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Creates a XAQueueConnection with the default user identity. + * <p> The XAQueueConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @return A newly created XAQueueConnection + * @throws JMSException If creating the XAQueueConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAQueueConnection createXAQueueConnection() throws JMSException + { + try + { + return new XAQueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + + /** + * Creates a XAQueueConnection with the specified user identity. + * <p> The XAQueueConnection is created in stopped mode. No messages + * will be delivered until the <code>Connection.start</code> method + * is explicitly called. + * + * @param username the caller's user name + * @param password the caller's password + * @return A newly created XAQueueConnection. + * @throws JMSException If creating the XAQueueConnection fails due to some internal error. + * @throws JMSSecurityException If client authentication fails due to an invalid user name or password. + */ + public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException + { + try + { + return new XAQueueConnectionImpl(_host, _port, _virtualHost, username, password); + } + catch (QpidException e) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("PRoblem when creating connection", e); + } + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + } + // ---------------------------------------- // Support for JNDI // ---------------------------------------- + public Reference getReference() throws NamingException { - return new Reference( ConnectionFactoryImpl.class.getName(), - new StringRefAddr(ConnectionFactoryImpl.class.getName(),_url)); + return new Reference(ConnectionFactoryImpl.class.getName(), + new StringRefAddr(ConnectionFactoryImpl.class.getName(), _url)); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index 656a8cddd3..60dc126dcf 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory; /** - * Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection + * Implements javax.jms.Connection, javax.jms.QueueConnection and javax.jms.TopicConnection */ -public class ConnectionImpl implements Connection, QueueConnection, TopicConnection, Referenceable +public class ConnectionImpl implements Connection, Referenceable { /** * This class's logger @@ -112,9 +112,17 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect //------ Constructors ---// /** - * TODO define the parameters + * Create a connection. + * + * @param host The broker host name. + * @param port The port on which the broker is listening for connection. + * @param virtualHost The virtual host on which the broker is deployed. + * @param username The user name used of user identification. + * @param password The password name used of user identification. + * @throws QpidException If creating a connection fails due to some internal error. */ - public ConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException + protected ConnectionImpl(String host, int port, String virtualHost, String username, String password) + throws QpidException { _qpidConnection = Client.createConnection(); _qpidConnection.connect(host, port, virtualHost, username, password); @@ -428,8 +436,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect * @param acknowledgeMode Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>, and * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. * @return a newly created topic session - * @throws JMSException If creating the session fails due to some internal error. - * @throws QpidException + * @throws JMSException If creating the session fails due to some internal error. + * @throws QpidException */ public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { @@ -499,10 +507,9 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { return _qpidConnection; } - + public Reference getReference() throws NamingException { - return new Reference( ConnectionImpl.class.getName(), - new StringRefAddr(ConnectionImpl.class.getName(),"")); + return new Reference(ConnectionImpl.class.getName(), new StringRefAddr(ConnectionImpl.class.getName(), "")); } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java index 1964d4e525..df585ab50e 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java @@ -18,7 +18,6 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; import org.apache.qpidity.url.BindingURL; import javax.jms.Destination; @@ -31,7 +30,7 @@ public class DestinationImpl implements Destination /** * The destination's name */ - protected String _name = null; + protected String _destinationName = null; /** * The session used to create this destination @@ -43,30 +42,43 @@ public class DestinationImpl implements Destination */ protected String _exchangeName; - /** + /** * The excahnge class */ - protected String _exchangeClass; + protected String _exchangeType; - /** - * The queu name + /** + * The queue name */ protected String _queueName; + /** + * Indicate whether this destination is exclusive + */ + protected boolean _isExclusive; + + /** + * Indicates whether this destination is auto delete. + */ + protected boolean _isAutoDelete; + + /** + * Indicates whether this destination is durable + */ + protected boolean _isDurable; + //--- Constructor /** - * Create a new DestinationImpl with a given name. + * Create a new DestinationImpl. * - * @param name The name of this destination. - * @param session The session used to create this destination. - * @throws QpidException If the destiantion name is not valid + * @param session The session used to create this DestinationImpl. */ - protected DestinationImpl(SessionImpl session, String name) throws QpidException + protected DestinationImpl(SessionImpl session) { - _session = session; - _name = name; + _session = session; } + /** * Create a destiantion from a binding URL * @@ -78,65 +90,103 @@ public class DestinationImpl implements Destination { _session = session; _exchangeName = binding.getExchangeName(); - _exchangeClass = binding.getExchangeClass(); - _name = binding.getDestinationName(); - // _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); - boolean isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); - boolean isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); + _exchangeType = binding.getExchangeClass(); + _destinationName = binding.getDestinationName(); + _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE)); + _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE)); + _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE)); _queueName = binding.getQueueName(); - // create this exchange - _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeClass, null, null, - isDurable ? Option.DURABLE : Option.NO_OPTION, - isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION); } //---- Getters and Setters + /** + * Overrides Object.toString(); + * + * @return Stringified destination representation. + */ + public String toString() + { + return _destinationName; + } /** - * Gets the name of this destination. + * Get the destination name. * - * @return The destination name. + * @return The destination name */ - public String getName() + public String getDestinationName() { - return _name; + return _destinationName; } /** - * set the destination name - * <p> This name is not verified until producing or consuming messages for that destination. + * Get the session of this destination * - * @param name The destination name. + * @return The session of this destination */ - public void setName(String name) + public SessionImpl getSession() { - _name = name; + return _session; } /** - * Overrides Object.toString(); + * The exchange name * - * @return Stringified destination representation. + * @return The exchange name */ - public String toString() + public String getExchangeName() { - return _name; + return _exchangeName; + } + + /** + * The exchange type. + * + * @return The exchange type. + */ + public String getExchangeType() + { + return _exchangeType; } - // getter methods + /** + * The queue name. + * + * @return The queue name. + */ public String getQpidQueueName() { return _queueName; } - public String getExchangeName() + /** + * Indicates whether this destination is exclusive. + * + * @return true if this destination is exclusive. + */ + public boolean isExclusive() { - return _exchangeName; + return _isExclusive; + } + + /** + * Indicates whether this destination is AutoDelete. + * + * @return true if this destination is AutoDelete. + */ + public boolean isAutoDelete() + { + return _isAutoDelete; } - public String getExchangeClass() + /** + * Indicates whether this destination is Durable. + * + * @return true if this destination is Durable. + */ + public boolean isDurable() { - return _exchangeClass; + return _isDurable; } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java index 6ea9a328e5..4cb649c71a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java @@ -130,7 +130,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer { // this is a queue we expect that this queue exists getSession().getQpidSession() - .messageSubscribe(destination.getName(), getMessageActorID(), + .messageSubscribe(destination.getQpidQueueName(), getMessageActorID(), org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, // When the message selctor is set we do not acquire the messages _messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, @@ -163,7 +163,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer } // bind this queue with the topic exchange getSession().getQpidSession() - .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null); + .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null); // subscribe to this topic getSession().getQpidSession() .messageSubscribe(queueName, getMessageActorID(), @@ -592,7 +592,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer // TODO: messageID is a string but range need a long??? // ranges.add(message.getMessageID()); - getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE); + getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE); RangeSet acquired = getSession().getQpidSession().getAccquiredMessages(); if (acquired.size() > 0) { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java index 18dfebeabf..cc71bddafc 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java @@ -17,13 +17,13 @@ */ package org.apache.qpidity.jms; -import org.apache.qpidity.jms.message.QpidMessage; import org.apache.qpidity.jms.message.MessageHelper; import org.apache.qpidity.jms.message.MessageImpl; import org.apache.qpidity.QpidException; import javax.jms.*; import java.util.UUID; +import java.io.IOException; /** * Implements MessageProducer @@ -318,7 +318,7 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer } // the messae UID String uid = (_disableMessageId) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString(); - MessageImpl qpidMessage = null; + MessageImpl qpidMessage; // check that the message is not a foreign one try { @@ -358,6 +358,8 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer qpidMessage.setJMSExpiration(timeToLive); } qpidMessage.setJMSTimestamp(currentTime); + qpidMessage.setRoutingKey(((DestinationImpl) destination).getDestinationName()); + qpidMessage.setExchangeName(((DestinationImpl) destination).getExchangeName()); // call beforeMessageDispatch try { @@ -367,6 +369,16 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } - // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option); + try + { + getSession().getQpidSession().messageTransfer(qpidMessage.getExchangeName(), + qpidMessage.getQpidityMessage(), + org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, + org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + } + catch (IOException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java new file mode 100644 index 0000000000..8d008bda7e --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java @@ -0,0 +1,37 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.jms.ConnectionImpl; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.Client; + +import javax.jms.QueueConnection; + +/** + * + * Implements javax.jms.QueueConnection + */ +public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection +{ + //-- constructor + public QueueConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException + { + super(host, port, virtualHost, username, password); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java index ce1d11c7e5..b9e1d5a9db 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java @@ -32,6 +32,17 @@ public class QueueImpl extends DestinationImpl implements Queue { //--- Constructor + + /** + * Create a new QueueImpl. + * + * @param session The session used to create this QueueImpl. + */ + protected QueueImpl(SessionImpl session) + { + super(session); + } + /** * Create a new QueueImpl with a given name. * @@ -41,25 +52,28 @@ public class QueueImpl extends DestinationImpl implements Queue */ protected QueueImpl(SessionImpl session, String name) throws QpidException { - super(session, name); - _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; - _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + super(session); _queueName = name; - // check that this queue exist on the server - // As pasive is set the server will not create the queue. - session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE); + _destinationName = name; + _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; + _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; + _isAutoDelete = false; + _isDurable = true; + _isExclusive = false; + registerQueue(false); } /** * Create a destiantion from a binding URL * - * @param session The session used to create this queue. + * @param session The session used to create this queue. * @param binding The URL * @throws QpidException If the URL is not valid */ protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException { - super(session, binding); + super(session, binding); + registerQueue(false); } //---- Interface javax.jms.Queue @@ -70,6 +84,35 @@ public class QueueImpl extends DestinationImpl implements Queue */ public String getQueueName() throws JMSException { - return super.getName(); + return _queueName; + } + + //---Private methods + /** + * Check that this queue exists and declare it if required. + * + * @param declare Specify whether the queue should be declared + * @throws QpidException If this queue does not exists on the broker. + */ + protected void registerQueue(boolean declare) throws QpidException + { + // test if this exchange exist on the broker + //todo we can also specify if the excahnge is autodlete and durable + _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); + // wait for the broker response + _session.getQpidSession().sync(); + // If this exchange does not exist then we will get an Expection 404 does notexist + //todo check for an execption + // now check if the queue exists + _session.getQpidSession().queueDeclare(_queueName, null, null, _isDurable ? Option.DURABLE : Option.NO_OPTION, + _isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION, + _isExclusive ? Option.EXCLUSIVE : Option.NO_OPTION, + declare ? Option.PASSIVE : Option.NO_OPTION); + // wait for the broker response + _session.getQpidSession().sync(); + // If this queue does not exist then we will get an Expection 404 does notexist + _session.getQpidSession().queueBind(_queueName, _exchangeName, _destinationName, null); + // we don't have to sync as we don't expect an error } + } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java index fe94531046..22e28f5c42 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java @@ -18,8 +18,6 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; -import org.apache.qpidity.Option; -import org.apache.qpidity.exchange.ExchangeDefaults; import javax.jms.TemporaryQueue; import javax.jms.JMSException; @@ -28,17 +26,16 @@ import java.util.UUID; /** * Implements TemporaryQueue */ -public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueue, TemporaryDestination +public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination { /** * Indicates whether this temporary queue is deleted. */ - private boolean _isDeleted = false; + private boolean _isDeleted; //--- constructor - /** - * Create a new TemporaryQueueImpl with a given name. + * Create a new TemporaryQueueImpl. * * @param session The session used to create this TemporaryQueueImpl. * @throws QpidException If creating the TemporaryQueueImpl fails due to some error. @@ -46,14 +43,15 @@ public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueu protected TemporaryQueueImpl(SessionImpl session) throws QpidException { // temporary destinations do not have names - super(session, "NAME_NOT_SET"); - _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; - _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + super(session); _queueName = "TempQueue-" + UUID.randomUUID(); - // check that this queue exist on the server - // As pasive is set the server will not create the queue. - session.getQpidSession().queueDeclare(_queueName, null, null, Option.AUTO_DELETE); - session.getQpidSession().queueBind(_queueName, _exchangeName, _queueName, null); + _destinationName = _queueName; + _isAutoDelete = false; + _isDurable = false; + _isExclusive = false; + _isDeleted = false; + // we must create this queue + registerQueue(true); } //-- TemporaryDestination Interface @@ -67,7 +65,7 @@ public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueu return _isDeleted; } - //-- TemporaryTopic Interface + //-- TemporaryQueue Interface /** * Delete this temporary destinaiton * @@ -75,18 +73,12 @@ public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueu */ public void delete() throws JMSException { - // todo delete this temporary queue + if (!_isDeleted) + { + _session.getQpidSession().queueDelete(_queueName); + } _isDeleted = true; } - //---- Interface javax.jms.Queue - /** - * Gets the name of this queue. - * - * @return This queue's name. - */ - public String getQueueName() throws JMSException - { - return super.getName(); - } } + diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java index e103c9dad1..092ee15b2f 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java @@ -21,6 +21,7 @@ import org.apache.qpidity.QpidException; import javax.jms.TemporaryTopic; import javax.jms.JMSException; +import java.util.UUID; /** @@ -43,7 +44,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem protected TemporaryTopicImpl(SessionImpl session) throws QpidException { // temporary destinations do not have names. - super(session, "NAME_NOT_SET"); + super(session, "TemporayTopic-" + UUID.randomUUID()); } //-- TemporaryDestination Interface diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java new file mode 100644 index 0000000000..f8f19a9e33 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java @@ -0,0 +1,35 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.TopicConnection; + +/** + * Implements javax.jms.TopicConnection + */ +public class TopicConnectionImpl extends ConnectionImpl implements TopicConnection +{ + //-- constructor + public TopicConnectionImpl(String host, int port, String virtualHost, String username, String password) + throws QpidException + { + super(host, port, virtualHost, username, password); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java index a439c514d4..4fc0a28ecb 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java @@ -18,10 +18,12 @@ package org.apache.qpidity.jms; import org.apache.qpidity.QpidException; +import org.apache.qpidity.Option; import org.apache.qpidity.exchange.ExchangeDefaults; import org.apache.qpidity.url.BindingURL; import javax.jms.Topic; +import java.util.UUID; /** * Implementation of the javax.jms.Topic interface. @@ -38,9 +40,15 @@ public class TopicImpl extends DestinationImpl implements Topic */ public TopicImpl(SessionImpl session, String name) throws QpidException { - super(session, name); + super(session); + _queueName = "Topic-" + UUID.randomUUID(); + _destinationName = name; _exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME; - _exchangeClass = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS; + _isAutoDelete = true; + _isDurable = false; + _isExclusive = true; + checkTopicExists(); } /** @@ -53,6 +61,7 @@ public class TopicImpl extends DestinationImpl implements Topic protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException { super(session, binding); + checkTopicExists(); } //--- javax.jsm.Topic Interface @@ -63,7 +72,20 @@ public class TopicImpl extends DestinationImpl implements Topic */ public String getTopicName() { - return super.getName(); + return _destinationName; } + /** + * Check that this topic exchange + * + * @throws QpidException If this queue does not exists on the broker. + */ + protected void checkTopicExists() throws QpidException + { + // test if this exchange exist on the broker + _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE); + // wait for the broker response + _session.getQpidSession().sync(); + // todo get the exception + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java index 3863adb07a..1a7fae5680 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java @@ -28,11 +28,23 @@ import javax.jms.XASession; */ public class XAConnectionImpl extends ConnectionImpl implements XAConnection { - public XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException + //-- constructor + /** + * Create a XAConnection. + * + * @param host The broker host name. + * @param port The port on which the broker is listening for connection. + * @param virtualHost The virtual host on which the broker is deployed. + * @param username The user name used of user identification. + * @param password The password name used of user identification. + * @throws QpidException If creating a connection fails due to some internal error. + */ + protected XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException { super(host, port, virtualHost, username, password); } + //-- interface XAConnection /** * Creates an XASession. * diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java new file mode 100644 index 0000000000..55d46c874b --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java @@ -0,0 +1,72 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.XAQueueConnection; +import javax.jms.JMSException; +import javax.jms.XAQueueSession; + +/** + * Implements XAQueueConnection + */ +public class XAQueueConnectionImpl extends XAConnectionImpl implements XAQueueConnection +{ + //-- constructor + /** + * Create a XAQueueConnection. + * + * @param host The broker host name. + * @param port The port on which the broker is listening for connection. + * @param virtualHost The virtual host on which the broker is deployed. + * @param username The user name used of user identification. + * @param password The password name used of user identification. + * @throws QpidException If creating a XAQueueConnection fails due to some internal error. + */ + public XAQueueConnectionImpl(String host, int port, String virtualHost, String username, String password) + throws QpidException + { + super(host, port, virtualHost, username, password); + } + + //-- Interface XAQueueConnection + /** + * Creates an XAQueueSession. + * + * @return A newly created XASession. + * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to + * some internal error. + */ + public synchronized XAQueueSession createXAQueueSession() throws JMSException + { + checkNotClosed(); + XAQueueSessionImpl xaQueueSession; + try + { + xaQueueSession = new XAQueueSessionImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // add this session with the list of session that are handled by this connection + _sessions.add(xaQueueSession); + return xaQueueSession; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java new file mode 100644 index 0000000000..86aad99064 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java @@ -0,0 +1,64 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.QueueSession; +import javax.jms.JMSException; +import javax.jms.XAQueueSession; + +/** + * Imeplements javax.jms.XAQueueSessionImpl + */ +public class XAQueueSessionImpl extends XASessionImpl implements XAQueueSession +{ + /** + * The standard session + */ + private QueueSession _jmsQueueSession; + + //-- Constructors + /** + * Create a JMS XAQueueSessionImpl + * + * @param connection The ConnectionImpl object from which the Session is created. + * @throws org.apache.qpidity.QpidException + * In case of internal error. + */ + protected XAQueueSessionImpl(ConnectionImpl connection) throws QpidException + { + super(connection); + } + + //--- interface XAQueueSession + /** + * Gets the topic session associated with this <CODE>XATopicSession</CODE>. + * + * @return the topic session object + * @throws JMSException If an internal error occurs. + */ + public QueueSession getQueueSession() throws JMSException + { + if (_jmsQueueSession == null) + { + _jmsQueueSession = getConnection().createQueueSession(true, getAcknowledgeMode()); + } + return _jmsQueueSession; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java index a1d5a345c2..abd8bd9c79 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java @@ -41,6 +41,11 @@ public class XASessionImpl extends SessionImpl implements XASession */ private DtxSession _qpidDtxSession; + /** + * The standard session + */ + private Session _jmsSession; + //-- Constructors /** * Create a JMS XASession @@ -62,13 +67,16 @@ public class XASessionImpl extends SessionImpl implements XASession /** * Gets the session associated with this XASession. * - * @return the session object + * @return The session object. * @throws JMSException if an internal error occurs. - * @since 1.1 */ public Session getSession() throws JMSException { - return this; + if( _jmsSession == null ) + { + _jmsSession = getConnection().createSession(true, getAcknowledgeMode()); + } + return _jmsSession; } /** diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XATopicConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XATopicConnectionImpl.java new file mode 100644 index 0000000000..3dddc38e23 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XATopicConnectionImpl.java @@ -0,0 +1,71 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.XATopicConnection; +import javax.jms.JMSException; +import javax.jms.XATopicSession; + +/** + * implements javax.jms.XATopicConnection + */ +public class XATopicConnectionImpl extends XAConnectionImpl implements XATopicConnection +{ + //-- constructor + /** + * Create a XATopicConnection. + * + * @param host The broker host name. + * @param port The port on which the broker is listening for connection. + * @param virtualHost The virtual host on which the broker is deployed. + * @param username The user name used of user identification. + * @param password The password name used of user identification. + * @throws QpidException If creating a XATopicConnection fails due to some internal error. + */ + public XATopicConnectionImpl(String host, int port, String virtualHost, String username, String password) + throws QpidException + { + super(host, port, virtualHost, username, password); + } + + /** + * Creates an XATopicSession. + * + * @return A newly created XATopicSession. + * @throws JMSException If the XAConnectiono fails to create an XATopicSession due to + * some internal error. + */ + public synchronized XATopicSession createXATopicSession() throws JMSException + { + checkNotClosed(); + XATopicSessionImpl xaTopicSession; + try + { + xaTopicSession = new XATopicSessionImpl(this); + } + catch (QpidException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + // add this session with the list of session that are handled by this connection + _sessions.add(xaTopicSession); + return xaTopicSession; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XATopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XATopicSessionImpl.java new file mode 100644 index 0000000000..3fc948901c --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/XATopicSessionImpl.java @@ -0,0 +1,63 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpidity.jms; + +import org.apache.qpidity.QpidException; + +import javax.jms.*; + +/** + * Implements javax.jms.XATopicSession and javax.jms.TopicSession + */ +public class XATopicSessionImpl extends XASessionImpl implements XATopicSession +{ + /** + * The standard session + */ + private TopicSession _jmsTopicSession; + + //-- Constructors + /** + * Create a JMS XASession + * + * @param connection The ConnectionImpl object from which the Session is created. + * @throws org.apache.qpidity.QpidException + * In case of internal error. + */ + protected XATopicSessionImpl(ConnectionImpl connection) throws QpidException + { + super(connection); + } + + //--- interface XATopicSession + + /** + * Gets the topic session associated with this <CODE>XATopicSession</CODE>. + * + * @return the topic session object + * @throws JMSException If an internal error occurs. + */ + public TopicSession getTopicSession() throws JMSException + { + if (_jmsTopicSession == null) + { + _jmsTopicSession = getConnection().createTopicSession(true, getAcknowledgeMode()); + } + return _jmsTopicSession; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java index 7d5ae755f5..1bd93d792d 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java @@ -885,7 +885,7 @@ public class MessageImpl extends QpidMessage implements Message /** * This method is invoked after this message is received. * - * @throws QpidException If there is an internal error when procesing this message. + * @throws QpidException If there is an internal error when procesing this message. */ public void afterMessageReceive() throws QpidException { @@ -922,4 +922,5 @@ public class MessageImpl extends QpidMessage implements Message { _messageConsumer = messageConsumer; } + } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java index 7c46e191ef..2b7316e847 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java @@ -348,6 +348,35 @@ public class QpidMessage } /** + * Set this message AMQP routingkey + * + * @param routingKey This message AMQP routingkey + */ + public void setRoutingKey(String routingKey) + { + _qpidityMessage.getDeliveryProperties().setRoutingKey(routingKey); + } + + /** + * Set this message AMQP exchange name. + * + * @param exchangeName This message AMQP exchange name. + */ + public void setExchangeName(String exchangeName) + { + _qpidityMessage.getDeliveryProperties().setExchange(exchangeName); + } + + /** + * Get this message excahgne name + * @return this message excahgne name + */ + public String getExchangeName() + { + return _qpidityMessage.getDeliveryProperties().getExchange(); + } + + /** * This method is invoked before a message dispatch operation. * * @throws QpidException If the destination is not set @@ -368,6 +397,16 @@ public class QpidMessage throw new QpidException("IO exception when sending message", ErrorCode.UNDEFINED, e); } } + + /** + * Get the underlying qpidity message + * + * @return The underlying qpidity message. + */ + public org.apache.qpidity.api.Message getQpidityMessage() + { + return _qpidityMessage; + } } |