summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-12-12 16:04:27 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-12-12 16:04:27 +0000
commitf0c528be47bbdaaee5398dc68c4b63508f3bbc23 (patch)
tree0ce493eb2e921df1851304af8506cd8f6be27345 /java/client
parent7ea6bf6f6a11cd81788952c37c4463eb4b2e6927 (diff)
downloadqpid-python-f0c528be47bbdaaee5398dc68c4b63508f3bbc23.tar.gz
QPID-3625 Comitting a patch from Weston Price.
(cherry picked from commit e7b630fd35055c9b80e3799c6939c7f3687b0c02) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1213290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java29
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java9
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java11
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java37
5 files changed, 73 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index afb0e45f7a..7fc1d25c18 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -50,6 +50,8 @@ public interface AMQConnectionDelegate
XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
+ XASession createXASession(int ackMode) throws JMSException;
+
void failoverPrep();
void resubscribeSessions() throws JMSException, AMQException, FailoverException;
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 24ec94e1b8..802cc55b0e 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -164,6 +164,35 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
return session;
}
+ @Override
+ public XASession createXASession(int ackMode)
+ throws JMSException
+ {
+
+ _conn.checkNotClosed();
+
+ if (_conn.channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
+ }
+
+ int channelId = _conn.getNextChannelID();
+ XASessionImpl session;
+ try
+ {
+ session = new XASessionImpl(_qpidConnection, _conn, channelId, ackMode, (int)_conn.getMaxPrefetch(), (int)_conn.getMaxPrefetch() / 2);
+ _conn.registerSession(channelId, session);
+ if (_conn._started)
+ {
+ session.start();
+ }
+ }
+ catch (Exception e)
+ {
+ throw new JMSAMQException("cannot create session", e);
+ }
+ return session;
+ }
/**
* Make a connection with the broker
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index bff4df0e93..399534e834 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -161,8 +161,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
_conn._failoverPolicy.attainedConnection();
_conn._connected = true;
return null;
- }
- else
+ }
+ else
{
return _conn._protocolHandler.getSuggestedProtocolVersion();
}
@@ -175,11 +175,16 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
+
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
{
throw new UnsupportedOperationException("0_8 version does not provide XA support");
}
+ public XASession createXASession(int ackMode) throws JMSException
+ {
+ throw new UnsupportedOperationException("0_8 version does not provide XA support");
+ }
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
index 97048f39f4..509aa25bd5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -75,4 +75,11 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ
{
return (XATopicSession) createXASession();
}
+
+ //Specialized call for JCA
+ public XASession createXASession(int ackMode) throws JMSException
+ {
+ checkNotClosed();
+ return _delegate.createXASession(ackMode);
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
index e7b56e95a8..aaabf613fc 100644
--- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
@@ -6,7 +6,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE 2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -43,21 +43,36 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
private Session _jmsSession;
- //-- Constructors
+ // Constructors
/**
* Create a JMS XASession
*/
public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
int defaultPrefetchHigh, int defaultPrefetchLow)
{
- super(qpidConnection, con, channelId, false, // this is not a transacted session
- Session.AUTO_ACKNOWLEDGE,
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
+ this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE,
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow, null);
+ }
+
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
+ int ackMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+ {
+ this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(),
+ defaultPrefetchHigh, defaultPrefetchLow, null);
+
+ }
+
+ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId,
+ boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow,
+ String name)
+ {
+ super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name);
createSession();
_xaResource = new XAResourceImpl(this);
- }
+ }
+
- //-- public methods
+ // public methods
/**
* Create a qpid session.
@@ -70,7 +85,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
}
- //--- javax.njms.XASEssion API
+ // javax.njms.XASEssion API
/**
* Gets the session associated with this XASession.
@@ -97,7 +112,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
return _xaResource;
}
- //-- overwritten mehtods
+ // overwritten mehtods
/**
* Throws a {@link TransactionInProgressException}, since it should
* not be called for an XASession object.
@@ -132,7 +147,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
return _qpidDtxSession;
}
- //--- interface XAQueueSession
+ // interface XAQueueSession
/**
* Gets the topic session associated with this <CODE>XATopicSession</CODE>.
*
@@ -144,7 +159,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic
return (QueueSession) getSession();
}
- //--- interface XATopicSession
+ // interface XATopicSession
/**
* Gets the topic session associated with this <CODE>XATopicSession</CODE>.