summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/Session.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java382
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java25
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java130
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java18
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java37
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java61
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java42
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java3
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java35
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java28
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java14
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java72
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java64
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java14
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XATopicConnectionImpl.java71
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/XATopicSessionImpl.java63
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java3
-rw-r--r--java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java39
20 files changed, 978 insertions, 131 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java
index 42f03acc7a..fa3bdf0934 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java
@@ -568,7 +568,7 @@ public interface Session
* @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
* exchange types define the functionality of the exchange - i.e. how messages are routed
* through it. It is not valid or meaningful to attempt to change the type of an existing
- * exchange. Default exchange types are: direct, topic, headers and fanout.
+ * exchange. Default exchange types are: direct, topic, headers and fanout.
* @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
* the message will be sent.
* @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
index 6a786e3edb..2c8ce94e27 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
@@ -1,100 +1,418 @@
package org.apache.qpidity.jms;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
+import javax.jms.*;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import org.apache.qpidity.QpidException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class ConnectionFactoryImpl implements ConnectionFactory,QueueConnectionFactory, TopicConnectionFactory, Referenceable
-{
+public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
+ XATopicConnectionFactory, XAQueueConnectionFactory, XAConnectionFactory,
+ Referenceable
+{
+ /**
+ * this ConnectionFactoryImpl's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(ConnectionFactoryImpl.class);
+
+ /**
+ * The virtual host on which the broker is deployed.
+ */
private String _host;
+ /**
+ * The port on which the broker is listening for connection.
+ */
private int _port;
+ /**
+ * The default user name used of user identification.
+ */
private String _defaultUsername;
+ /**
+ * The default password used of user identification.
+ */
private String _defaultPassword;
- private String _virtualPath;
+ /**
+ * The virtual host on which the broker is deployed.
+ */
+ private String _virtualHost;
+ /**
+ * The URL used to build this factory, (not yet supported)
+ */
private String _url;
-
+
// Undefined at the moment
public ConnectionFactoryImpl(String url)
{
_url = url;
}
-
- public ConnectionFactoryImpl(String host,int port,String virtualHost,String defaultUsername,String defaultPassword)
+
+ /**
+ * Create a connection.
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param defaultUsername The user name used of user identification.
+ * @param defaultPassword The password used of user identification.
+ */
+ public ConnectionFactoryImpl(String host, int port, String virtualHost, String defaultUsername,
+ String defaultPassword)
{
_host = host;
_port = port;
_defaultUsername = defaultUsername;
_defaultPassword = defaultPassword;
- _virtualPath = virtualHost;
+ _virtualHost = virtualHost;
}
-
+
+ //-- Interface ConnectionFactory
+
+ /**
+ * Creates a connection with the default user identity.
+ * <p> The connection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created connection.
+ * @throws JMSException If creating the connection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
public Connection createConnection() throws JMSException
- {
+ {
try
{
- return new ConnectionImpl(_host,_port,_virtualPath,_defaultUsername,_defaultPassword);
+ return new ConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
}
- catch(QpidException e)
+ catch (QpidException e)
{
- // need to convert the qpid exception into jms exception
- throw new JMSException("","");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
}
+ /**
+ * Creates a connection with the specified user identity.
+ * <p> The connection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created connection.
+ * @throws JMSException If creating the connection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
public Connection createConnection(String username, String password) throws JMSException
{
try
{
- return new ConnectionImpl(_host,_port,_virtualPath,username,password);
+ return new ConnectionImpl(_host, _port, _virtualHost, username, password);
}
- catch(QpidException e)
+ catch (QpidException e)
{
- // need to convert the qpid exception into jms exception
- throw new JMSException("","");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
}
// ----------------------------------------
// Support for JMS 1.0 classes
// ----------------------------------------
+ //--- Interface QueueConnection
+ /**
+ * Creates a queueConnection with the default user identity.
+ * <p> The queueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created queueConnection
+ * @throws JMSException If creating the queueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
public QueueConnection createQueueConnection() throws JMSException
{
- return (QueueConnection) createConnection();
+ try
+ {
+ return new QueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
+ /**
+ * Creates a queueConnection with the specified user identity.
+ * <p> The queueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created queueConnection.
+ * @throws JMSException If creating the queueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
public QueueConnection createQueueConnection(String username, String password) throws JMSException
{
- return (QueueConnection) createConnection(username, password);
+ try
+ {
+ return new QueueConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
+ //--- Interface TopicConnection
+ /**
+ * Creates a topicConnection with the default user identity.
+ * <p> The topicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created topicConnection
+ * @throws JMSException If creating the topicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
public TopicConnection createTopicConnection() throws JMSException
{
- return (TopicConnection) createConnection();
+ try
+ {
+ return new TopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
+ /**
+ * Creates a topicConnection with the specified user identity.
+ * <p> The topicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created topicConnection.
+ * @throws JMSException If creating the topicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
public TopicConnection createTopicConnection(String username, String password) throws JMSException
{
- return (TopicConnection) createConnection(username, password);
+ try
+ {
+ return new TopicConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------------
+ // the following methods are provided for XA compatibility
+ // ---------------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a XAConnection with the default user identity.
+ * <p> The XAConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XAConnection
+ * @throws JMSException If creating the XAConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAConnection createXAConnection() throws JMSException
+ {
+ try
+ {
+ return new XAConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a XAConnection with the specified user identity.
+ * <p> The XAConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XAConnection.
+ * @throws JMSException If creating the XAConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAConnection createXAConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new XAConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+
+ /**
+ * Creates a XATopicConnection with the default user identity.
+ * <p> The XATopicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XATopicConnection
+ * @throws JMSException If creating the XATopicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XATopicConnection createXATopicConnection() throws JMSException
+ {
+ try
+ {
+ return new XATopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
-
-
+
+ /**
+ * Creates a XATopicConnection with the specified user identity.
+ * <p> The XATopicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XATopicConnection.
+ * @throws JMSException If creating the XATopicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XATopicConnection createXATopicConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new XATopicConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a XAQueueConnection with the default user identity.
+ * <p> The XAQueueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XAQueueConnection
+ * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAQueueConnection createXAQueueConnection() throws JMSException
+ {
+ try
+ {
+ return new XAQueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a XAQueueConnection with the specified user identity.
+ * <p> The XAQueueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XAQueueConnection.
+ * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new XAQueueConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
// ----------------------------------------
// Support for JNDI
// ----------------------------------------
+
public Reference getReference() throws NamingException
{
- return new Reference( ConnectionFactoryImpl.class.getName(),
- new StringRefAddr(ConnectionFactoryImpl.class.getName(),_url));
+ return new Reference(ConnectionFactoryImpl.class.getName(),
+ new StringRefAddr(ConnectionFactoryImpl.class.getName(), _url));
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
index 656a8cddd3..60dc126dcf 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
/**
- * Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection
+ * Implements javax.jms.Connection, javax.jms.QueueConnection and javax.jms.TopicConnection
*/
-public class ConnectionImpl implements Connection, QueueConnection, TopicConnection, Referenceable
+public class ConnectionImpl implements Connection, Referenceable
{
/**
* This class's logger
@@ -112,9 +112,17 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
//------ Constructors ---//
/**
- * TODO define the parameters
+ * Create a connection.
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param username The user name used of user identification.
+ * @param password The password name used of user identification.
+ * @throws QpidException If creating a connection fails due to some internal error.
*/
- public ConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException
+ protected ConnectionImpl(String host, int port, String virtualHost, String username, String password)
+ throws QpidException
{
_qpidConnection = Client.createConnection();
_qpidConnection.connect(host, port, virtualHost, username, password);
@@ -428,8 +436,8 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
* @param acknowledgeMode Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created topic session
- * @throws JMSException If creating the session fails due to some internal error.
- * @throws QpidException
+ * @throws JMSException If creating the session fails due to some internal error.
+ * @throws QpidException
*/
public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
{
@@ -499,10 +507,9 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect
{
return _qpidConnection;
}
-
+
public Reference getReference() throws NamingException
{
- return new Reference( ConnectionImpl.class.getName(),
- new StringRefAddr(ConnectionImpl.class.getName(),""));
+ return new Reference(ConnectionImpl.class.getName(), new StringRefAddr(ConnectionImpl.class.getName(), ""));
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
index 1964d4e525..df585ab50e 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/DestinationImpl.java
@@ -18,7 +18,6 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
import org.apache.qpidity.url.BindingURL;
import javax.jms.Destination;
@@ -31,7 +30,7 @@ public class DestinationImpl implements Destination
/**
* The destination's name
*/
- protected String _name = null;
+ protected String _destinationName = null;
/**
* The session used to create this destination
@@ -43,30 +42,43 @@ public class DestinationImpl implements Destination
*/
protected String _exchangeName;
- /**
+ /**
* The excahnge class
*/
- protected String _exchangeClass;
+ protected String _exchangeType;
- /**
- * The queu name
+ /**
+ * The queue name
*/
protected String _queueName;
+ /**
+ * Indicate whether this destination is exclusive
+ */
+ protected boolean _isExclusive;
+
+ /**
+ * Indicates whether this destination is auto delete.
+ */
+ protected boolean _isAutoDelete;
+
+ /**
+ * Indicates whether this destination is durable
+ */
+ protected boolean _isDurable;
+
//--- Constructor
/**
- * Create a new DestinationImpl with a given name.
+ * Create a new DestinationImpl.
*
- * @param name The name of this destination.
- * @param session The session used to create this destination.
- * @throws QpidException If the destiantion name is not valid
+ * @param session The session used to create this DestinationImpl.
*/
- protected DestinationImpl(SessionImpl session, String name) throws QpidException
+ protected DestinationImpl(SessionImpl session)
{
- _session = session;
- _name = name;
+ _session = session;
}
+
/**
* Create a destiantion from a binding URL
*
@@ -78,65 +90,103 @@ public class DestinationImpl implements Destination
{
_session = session;
_exchangeName = binding.getExchangeName();
- _exchangeClass = binding.getExchangeClass();
- _name = binding.getDestinationName();
- // _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
- boolean isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
- boolean isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+ _exchangeType = binding.getExchangeClass();
+ _destinationName = binding.getDestinationName();
+ _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
+ _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
+ _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName();
- // create this exchange
- _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeClass, null, null,
- isDurable ? Option.DURABLE : Option.NO_OPTION,
- isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION);
}
//---- Getters and Setters
+ /**
+ * Overrides Object.toString();
+ *
+ * @return Stringified destination representation.
+ */
+ public String toString()
+ {
+ return _destinationName;
+ }
/**
- * Gets the name of this destination.
+ * Get the destination name.
*
- * @return The destination name.
+ * @return The destination name
*/
- public String getName()
+ public String getDestinationName()
{
- return _name;
+ return _destinationName;
}
/**
- * set the destination name
- * <p> This name is not verified until producing or consuming messages for that destination.
+ * Get the session of this destination
*
- * @param name The destination name.
+ * @return The session of this destination
*/
- public void setName(String name)
+ public SessionImpl getSession()
{
- _name = name;
+ return _session;
}
/**
- * Overrides Object.toString();
+ * The exchange name
*
- * @return Stringified destination representation.
+ * @return The exchange name
*/
- public String toString()
+ public String getExchangeName()
{
- return _name;
+ return _exchangeName;
+ }
+
+ /**
+ * The exchange type.
+ *
+ * @return The exchange type.
+ */
+ public String getExchangeType()
+ {
+ return _exchangeType;
}
- // getter methods
+ /**
+ * The queue name.
+ *
+ * @return The queue name.
+ */
public String getQpidQueueName()
{
return _queueName;
}
- public String getExchangeName()
+ /**
+ * Indicates whether this destination is exclusive.
+ *
+ * @return true if this destination is exclusive.
+ */
+ public boolean isExclusive()
{
- return _exchangeName;
+ return _isExclusive;
+ }
+
+ /**
+ * Indicates whether this destination is AutoDelete.
+ *
+ * @return true if this destination is AutoDelete.
+ */
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
}
- public String getExchangeClass()
+ /**
+ * Indicates whether this destination is Durable.
+ *
+ * @return true if this destination is Durable.
+ */
+ public boolean isDurable()
{
- return _exchangeClass;
+ return _isDurable;
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
index 6ea9a328e5..4cb649c71a 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
@@ -130,7 +130,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
// this is a queue we expect that this queue exists
getSession().getQpidSession()
- .messageSubscribe(destination.getName(), getMessageActorID(),
+ .messageSubscribe(destination.getQpidQueueName(), getMessageActorID(),
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// When the message selctor is set we do not acquire the messages
_messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
@@ -163,7 +163,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
}
// bind this queue with the topic exchange
getSession().getQpidSession()
- .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getName(), null);
+ .queueBind(queueName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, destination.getQpidQueueName(), null);
// subscribe to this topic
getSession().getQpidSession()
.messageSubscribe(queueName, getMessageActorID(),
@@ -592,7 +592,7 @@ public class MessageConsumerImpl extends MessageActor implements MessageConsumer
// TODO: messageID is a string but range need a long???
// ranges.add(message.getMessageID());
- getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
if (acquired.size() > 0)
{
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
index 18dfebeabf..cc71bddafc 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
@@ -17,13 +17,13 @@
*/
package org.apache.qpidity.jms;
-import org.apache.qpidity.jms.message.QpidMessage;
import org.apache.qpidity.jms.message.MessageHelper;
import org.apache.qpidity.jms.message.MessageImpl;
import org.apache.qpidity.QpidException;
import javax.jms.*;
import java.util.UUID;
+import java.io.IOException;
/**
* Implements MessageProducer
@@ -318,7 +318,7 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer
}
// the messae UID
String uid = (_disableMessageId) ? "MSG_ID_DISABLED" : UUID.randomUUID().toString();
- MessageImpl qpidMessage = null;
+ MessageImpl qpidMessage;
// check that the message is not a foreign one
try
{
@@ -358,6 +358,8 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer
qpidMessage.setJMSExpiration(timeToLive);
}
qpidMessage.setJMSTimestamp(currentTime);
+ qpidMessage.setRoutingKey(((DestinationImpl) destination).getDestinationName());
+ qpidMessage.setExchangeName(((DestinationImpl) destination).getExchangeName());
// call beforeMessageDispatch
try
{
@@ -367,6 +369,16 @@ public class MessageProducerImpl extends MessageActor implements MessageProducer
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- // todo getSession().getQpidSession().messageTransfer(((DestinationImpl) destination).getExchangeName(), message, Option);
+ try
+ {
+ getSession().getQpidSession().messageTransfer(qpidMessage.getExchangeName(),
+ qpidMessage.getQpidityMessage(),
+ org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ }
+ catch (IOException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java
new file mode 100644
index 0000000000..8d008bda7e
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueConnectionImpl.java
@@ -0,0 +1,37 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.jms.ConnectionImpl;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.Client;
+
+import javax.jms.QueueConnection;
+
+/**
+ *
+ * Implements javax.jms.QueueConnection
+ */
+public class QueueConnectionImpl extends ConnectionImpl implements QueueConnection
+{
+ //-- constructor
+ public QueueConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
index ce1d11c7e5..b9e1d5a9db 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
@@ -32,6 +32,17 @@ public class QueueImpl extends DestinationImpl implements Queue
{
//--- Constructor
+
+ /**
+ * Create a new QueueImpl.
+ *
+ * @param session The session used to create this QueueImpl.
+ */
+ protected QueueImpl(SessionImpl session)
+ {
+ super(session);
+ }
+
/**
* Create a new QueueImpl with a given name.
*
@@ -41,25 +52,28 @@ public class QueueImpl extends DestinationImpl implements Queue
*/
protected QueueImpl(SessionImpl session, String name) throws QpidException
{
- super(session, name);
- _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+ super(session);
_queueName = name;
- // check that this queue exist on the server
- // As pasive is set the server will not create the queue.
- session.getQpidSession().queueDeclare(name, null, null, Option.PASSIVE);
+ _destinationName = name;
+ _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ _exchangeType = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
+ _isAutoDelete = false;
+ _isDurable = true;
+ _isExclusive = false;
+ registerQueue(false);
}
/**
* Create a destiantion from a binding URL
*
- * @param session The session used to create this queue.
+ * @param session The session used to create this queue.
* @param binding The URL
* @throws QpidException If the URL is not valid
*/
protected QueueImpl(SessionImpl session, BindingURL binding) throws QpidException
{
- super(session, binding);
+ super(session, binding);
+ registerQueue(false);
}
//---- Interface javax.jms.Queue
@@ -70,6 +84,35 @@ public class QueueImpl extends DestinationImpl implements Queue
*/
public String getQueueName() throws JMSException
{
- return super.getName();
+ return _queueName;
+ }
+
+ //---Private methods
+ /**
+ * Check that this queue exists and declare it if required.
+ *
+ * @param declare Specify whether the queue should be declared
+ * @throws QpidException If this queue does not exists on the broker.
+ */
+ protected void registerQueue(boolean declare) throws QpidException
+ {
+ // test if this exchange exist on the broker
+ //todo we can also specify if the excahnge is autodlete and durable
+ _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
+ // wait for the broker response
+ _session.getQpidSession().sync();
+ // If this exchange does not exist then we will get an Expection 404 does notexist
+ //todo check for an execption
+ // now check if the queue exists
+ _session.getQpidSession().queueDeclare(_queueName, null, null, _isDurable ? Option.DURABLE : Option.NO_OPTION,
+ _isAutoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
+ _isExclusive ? Option.EXCLUSIVE : Option.NO_OPTION,
+ declare ? Option.PASSIVE : Option.NO_OPTION);
+ // wait for the broker response
+ _session.getQpidSession().sync();
+ // If this queue does not exist then we will get an Expection 404 does notexist
+ _session.getQpidSession().queueBind(_queueName, _exchangeName, _destinationName, null);
+ // we don't have to sync as we don't expect an error
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java
index fe94531046..22e28f5c42 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryQueueImpl.java
@@ -18,8 +18,6 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.exchange.ExchangeDefaults;
import javax.jms.TemporaryQueue;
import javax.jms.JMSException;
@@ -28,17 +26,16 @@ import java.util.UUID;
/**
* Implements TemporaryQueue
*/
-public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueue, TemporaryDestination
+public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue, TemporaryDestination
{
/**
* Indicates whether this temporary queue is deleted.
*/
- private boolean _isDeleted = false;
+ private boolean _isDeleted;
//--- constructor
-
/**
- * Create a new TemporaryQueueImpl with a given name.
+ * Create a new TemporaryQueueImpl.
*
* @param session The session used to create this TemporaryQueueImpl.
* @throws QpidException If creating the TemporaryQueueImpl fails due to some error.
@@ -46,14 +43,15 @@ public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueu
protected TemporaryQueueImpl(SessionImpl session) throws QpidException
{
// temporary destinations do not have names
- super(session, "NAME_NOT_SET");
- _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ super(session);
_queueName = "TempQueue-" + UUID.randomUUID();
- // check that this queue exist on the server
- // As pasive is set the server will not create the queue.
- session.getQpidSession().queueDeclare(_queueName, null, null, Option.AUTO_DELETE);
- session.getQpidSession().queueBind(_queueName, _exchangeName, _queueName, null);
+ _destinationName = _queueName;
+ _isAutoDelete = false;
+ _isDurable = false;
+ _isExclusive = false;
+ _isDeleted = false;
+ // we must create this queue
+ registerQueue(true);
}
//-- TemporaryDestination Interface
@@ -67,7 +65,7 @@ public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueu
return _isDeleted;
}
- //-- TemporaryTopic Interface
+ //-- TemporaryQueue Interface
/**
* Delete this temporary destinaiton
*
@@ -75,18 +73,12 @@ public class TemporaryQueueImpl extends DestinationImpl implements TemporaryQueu
*/
public void delete() throws JMSException
{
- // todo delete this temporary queue
+ if (!_isDeleted)
+ {
+ _session.getQpidSession().queueDelete(_queueName);
+ }
_isDeleted = true;
}
- //---- Interface javax.jms.Queue
- /**
- * Gets the name of this queue.
- *
- * @return This queue's name.
- */
- public String getQueueName() throws JMSException
- {
- return super.getName();
- }
}
+
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java
index e103c9dad1..092ee15b2f 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TemporaryTopicImpl.java
@@ -21,6 +21,7 @@ import org.apache.qpidity.QpidException;
import javax.jms.TemporaryTopic;
import javax.jms.JMSException;
+import java.util.UUID;
/**
@@ -43,7 +44,7 @@ public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic, Tem
protected TemporaryTopicImpl(SessionImpl session) throws QpidException
{
// temporary destinations do not have names.
- super(session, "NAME_NOT_SET");
+ super(session, "TemporayTopic-" + UUID.randomUUID());
}
//-- TemporaryDestination Interface
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java
new file mode 100644
index 0000000000..f8f19a9e33
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicConnectionImpl.java
@@ -0,0 +1,35 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.TopicConnection;
+
+/**
+ * Implements javax.jms.TopicConnection
+ */
+public class TopicConnectionImpl extends ConnectionImpl implements TopicConnection
+{
+ //-- constructor
+ public TopicConnectionImpl(String host, int port, String virtualHost, String username, String password)
+ throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
index a439c514d4..4fc0a28ecb 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/TopicImpl.java
@@ -18,10 +18,12 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
import org.apache.qpidity.exchange.ExchangeDefaults;
import org.apache.qpidity.url.BindingURL;
import javax.jms.Topic;
+import java.util.UUID;
/**
* Implementation of the javax.jms.Topic interface.
@@ -38,9 +40,15 @@ public class TopicImpl extends DestinationImpl implements Topic
*/
public TopicImpl(SessionImpl session, String name) throws QpidException
{
- super(session, name);
+ super(session);
+ _queueName = "Topic-" + UUID.randomUUID();
+ _destinationName = name;
_exchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- _exchangeClass = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
+ _exchangeType = ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
+ _isAutoDelete = true;
+ _isDurable = false;
+ _isExclusive = true;
+ checkTopicExists();
}
/**
@@ -53,6 +61,7 @@ public class TopicImpl extends DestinationImpl implements Topic
protected TopicImpl(SessionImpl session, BindingURL binding) throws QpidException
{
super(session, binding);
+ checkTopicExists();
}
//--- javax.jsm.Topic Interface
@@ -63,7 +72,20 @@ public class TopicImpl extends DestinationImpl implements Topic
*/
public String getTopicName()
{
- return super.getName();
+ return _destinationName;
}
+ /**
+ * Check that this topic exchange
+ *
+ * @throws QpidException If this queue does not exists on the broker.
+ */
+ protected void checkTopicExists() throws QpidException
+ {
+ // test if this exchange exist on the broker
+ _session.getQpidSession().exchangeDeclare(_exchangeName, _exchangeType, null, null, Option.PASSIVE);
+ // wait for the broker response
+ _session.getQpidSession().sync();
+ // todo get the exception
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
index 3863adb07a..1a7fae5680 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
@@ -28,11 +28,23 @@ import javax.jms.XASession;
*/
public class XAConnectionImpl extends ConnectionImpl implements XAConnection
{
- public XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException
+ //-- constructor
+ /**
+ * Create a XAConnection.
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param username The user name used of user identification.
+ * @param password The password name used of user identification.
+ * @throws QpidException If creating a connection fails due to some internal error.
+ */
+ protected XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException
{
super(host, port, virtualHost, username, password);
}
+ //-- interface XAConnection
/**
* Creates an XASession.
*
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java
new file mode 100644
index 0000000000..55d46c874b
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueConnectionImpl.java
@@ -0,0 +1,72 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XAQueueConnection;
+import javax.jms.JMSException;
+import javax.jms.XAQueueSession;
+
+/**
+ * Implements XAQueueConnection
+ */
+public class XAQueueConnectionImpl extends XAConnectionImpl implements XAQueueConnection
+{
+ //-- constructor
+ /**
+ * Create a XAQueueConnection.
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param username The user name used of user identification.
+ * @param password The password name used of user identification.
+ * @throws QpidException If creating a XAQueueConnection fails due to some internal error.
+ */
+ public XAQueueConnectionImpl(String host, int port, String virtualHost, String username, String password)
+ throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+
+ //-- Interface XAQueueConnection
+ /**
+ * Creates an XAQueueSession.
+ *
+ * @return A newly created XASession.
+ * @throws JMSException If the XAQueueConnectionImpl fails to create an XASession due to
+ * some internal error.
+ */
+ public synchronized XAQueueSession createXAQueueSession() throws JMSException
+ {
+ checkNotClosed();
+ XAQueueSessionImpl xaQueueSession;
+ try
+ {
+ xaQueueSession = new XAQueueSessionImpl(this);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add this session with the list of session that are handled by this connection
+ _sessions.add(xaQueueSession);
+ return xaQueueSession;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java
new file mode 100644
index 0000000000..86aad99064
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XAQueueSessionImpl.java
@@ -0,0 +1,64 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.QueueSession;
+import javax.jms.JMSException;
+import javax.jms.XAQueueSession;
+
+/**
+ * Imeplements javax.jms.XAQueueSessionImpl
+ */
+public class XAQueueSessionImpl extends XASessionImpl implements XAQueueSession
+{
+ /**
+ * The standard session
+ */
+ private QueueSession _jmsQueueSession;
+
+ //-- Constructors
+ /**
+ * Create a JMS XAQueueSessionImpl
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @throws org.apache.qpidity.QpidException
+ * In case of internal error.
+ */
+ protected XAQueueSessionImpl(ConnectionImpl connection) throws QpidException
+ {
+ super(connection);
+ }
+
+ //--- interface XAQueueSession
+ /**
+ * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+ *
+ * @return the topic session object
+ * @throws JMSException If an internal error occurs.
+ */
+ public QueueSession getQueueSession() throws JMSException
+ {
+ if (_jmsQueueSession == null)
+ {
+ _jmsQueueSession = getConnection().createQueueSession(true, getAcknowledgeMode());
+ }
+ return _jmsQueueSession;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
index a1d5a345c2..abd8bd9c79 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
@@ -41,6 +41,11 @@ public class XASessionImpl extends SessionImpl implements XASession
*/
private DtxSession _qpidDtxSession;
+ /**
+ * The standard session
+ */
+ private Session _jmsSession;
+
//-- Constructors
/**
* Create a JMS XASession
@@ -62,13 +67,16 @@ public class XASessionImpl extends SessionImpl implements XASession
/**
* Gets the session associated with this XASession.
*
- * @return the session object
+ * @return The session object.
* @throws JMSException if an internal error occurs.
- * @since 1.1
*/
public Session getSession() throws JMSException
{
- return this;
+ if( _jmsSession == null )
+ {
+ _jmsSession = getConnection().createSession(true, getAcknowledgeMode());
+ }
+ return _jmsSession;
}
/**
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XATopicConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XATopicConnectionImpl.java
new file mode 100644
index 0000000000..3dddc38e23
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XATopicConnectionImpl.java
@@ -0,0 +1,71 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XATopicConnection;
+import javax.jms.JMSException;
+import javax.jms.XATopicSession;
+
+/**
+ * implements javax.jms.XATopicConnection
+ */
+public class XATopicConnectionImpl extends XAConnectionImpl implements XATopicConnection
+{
+ //-- constructor
+ /**
+ * Create a XATopicConnection.
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param username The user name used of user identification.
+ * @param password The password name used of user identification.
+ * @throws QpidException If creating a XATopicConnection fails due to some internal error.
+ */
+ public XATopicConnectionImpl(String host, int port, String virtualHost, String username, String password)
+ throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+
+ /**
+ * Creates an XATopicSession.
+ *
+ * @return A newly created XATopicSession.
+ * @throws JMSException If the XAConnectiono fails to create an XATopicSession due to
+ * some internal error.
+ */
+ public synchronized XATopicSession createXATopicSession() throws JMSException
+ {
+ checkNotClosed();
+ XATopicSessionImpl xaTopicSession;
+ try
+ {
+ xaTopicSession = new XATopicSessionImpl(this);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add this session with the list of session that are handled by this connection
+ _sessions.add(xaTopicSession);
+ return xaTopicSession;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XATopicSessionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XATopicSessionImpl.java
new file mode 100644
index 0000000000..3fc948901c
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpidity/jms/XATopicSessionImpl.java
@@ -0,0 +1,63 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.*;
+
+/**
+ * Implements javax.jms.XATopicSession and javax.jms.TopicSession
+ */
+public class XATopicSessionImpl extends XASessionImpl implements XATopicSession
+{
+ /**
+ * The standard session
+ */
+ private TopicSession _jmsTopicSession;
+
+ //-- Constructors
+ /**
+ * Create a JMS XASession
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @throws org.apache.qpidity.QpidException
+ * In case of internal error.
+ */
+ protected XATopicSessionImpl(ConnectionImpl connection) throws QpidException
+ {
+ super(connection);
+ }
+
+ //--- interface XATopicSession
+
+ /**
+ * Gets the topic session associated with this <CODE>XATopicSession</CODE>.
+ *
+ * @return the topic session object
+ * @throws JMSException If an internal error occurs.
+ */
+ public TopicSession getTopicSession() throws JMSException
+ {
+ if (_jmsTopicSession == null)
+ {
+ _jmsTopicSession = getConnection().createTopicSession(true, getAcknowledgeMode());
+ }
+ return _jmsTopicSession;
+ }
+}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
index 7d5ae755f5..1bd93d792d 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
@@ -885,7 +885,7 @@ public class MessageImpl extends QpidMessage implements Message
/**
* This method is invoked after this message is received.
*
- * @throws QpidException If there is an internal error when procesing this message.
+ * @throws QpidException If there is an internal error when procesing this message.
*/
public void afterMessageReceive() throws QpidException
{
@@ -922,4 +922,5 @@ public class MessageImpl extends QpidMessage implements Message
{
_messageConsumer = messageConsumer;
}
+
}
diff --git a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
index 7c46e191ef..2b7316e847 100644
--- a/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
@@ -348,6 +348,35 @@ public class QpidMessage
}
/**
+ * Set this message AMQP routingkey
+ *
+ * @param routingKey This message AMQP routingkey
+ */
+ public void setRoutingKey(String routingKey)
+ {
+ _qpidityMessage.getDeliveryProperties().setRoutingKey(routingKey);
+ }
+
+ /**
+ * Set this message AMQP exchange name.
+ *
+ * @param exchangeName This message AMQP exchange name.
+ */
+ public void setExchangeName(String exchangeName)
+ {
+ _qpidityMessage.getDeliveryProperties().setExchange(exchangeName);
+ }
+
+ /**
+ * Get this message excahgne name
+ * @return this message excahgne name
+ */
+ public String getExchangeName()
+ {
+ return _qpidityMessage.getDeliveryProperties().getExchange();
+ }
+
+ /**
* This method is invoked before a message dispatch operation.
*
* @throws QpidException If the destination is not set
@@ -368,6 +397,16 @@ public class QpidMessage
throw new QpidException("IO exception when sending message", ErrorCode.UNDEFINED, e);
}
}
+
+ /**
+ * Get the underlying qpidity message
+ *
+ * @return The underlying qpidity message.
+ */
+ public org.apache.qpidity.api.Message getQpidityMessage()
+ {
+ return _qpidityMessage;
+ }
}