diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-12-12 16:04:27 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-12-12 16:04:27 +0000 |
commit | f0c528be47bbdaaee5398dc68c4b63508f3bbc23 (patch) | |
tree | 0ce493eb2e921df1851304af8506cd8f6be27345 /java/client | |
parent | 7ea6bf6f6a11cd81788952c37c4463eb4b2e6927 (diff) | |
download | qpid-python-f0c528be47bbdaaee5398dc68c4b63508f3bbc23.tar.gz |
QPID-3625 Comitting a patch from Weston Price.
(cherry picked from commit e7b630fd35055c9b80e3799c6939c7f3687b0c02)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1213290 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
5 files changed, 73 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index afb0e45f7a..7fc1d25c18 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -50,6 +50,8 @@ public interface AMQConnectionDelegate XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException; + XASession createXASession(int ackMode) throws JMSException; + void failoverPrep(); void resubscribeSessions() throws JMSException, AMQException, FailoverException; diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 24ec94e1b8..802cc55b0e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -164,6 +164,35 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec return session; } + @Override + public XASession createXASession(int ackMode) + throws JMSException + { + + _conn.checkNotClosed(); + + if (_conn.channelLimitReached()) + { + throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); + } + + int channelId = _conn.getNextChannelID(); + XASessionImpl session; + try + { + session = new XASessionImpl(_qpidConnection, _conn, channelId, ackMode, (int)_conn.getMaxPrefetch(), (int)_conn.getMaxPrefetch() / 2); + _conn.registerSession(channelId, session); + if (_conn._started) + { + session.start(); + } + } + catch (Exception e) + { + throw new JMSAMQException("cannot create session", e); + } + return session; + } /** * Make a connection with the broker diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index bff4df0e93..399534e834 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -161,8 +161,8 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate _conn._failoverPolicy.attainedConnection(); _conn._connected = true; return null; - } - else + } + else { return _conn._protocolHandler.getSuggestedProtocolVersion(); } @@ -175,11 +175,16 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate return createSession(transacted, acknowledgeMode, prefetch, prefetch); } + public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException { throw new UnsupportedOperationException("0_8 version does not provide XA support"); } + public XASession createXASession(int ackMode) throws JMSException + { + throw new UnsupportedOperationException("0_8 version does not provide XA support"); + } public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { diff --git a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java index 97048f39f4..509aa25bd5 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XAConnectionImpl.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -75,4 +75,11 @@ public class XAConnectionImpl extends AMQConnection implements XAConnection, XAQ { return (XATopicSession) createXASession(); } + + //Specialized call for JCA + public XASession createXASession(int ackMode) throws JMSException + { + checkNotClosed(); + return _delegate.createXASession(ackMode); + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java index e7b56e95a8..aaabf613fc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java @@ -6,7 +6,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE 2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -43,21 +43,36 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic private Session _jmsSession; - //-- Constructors + // Constructors /** * Create a JMS XASession */ public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, int defaultPrefetchHigh, int defaultPrefetchLow) { - super(qpidConnection, con, channelId, false, // this is not a transacted session - Session.AUTO_ACKNOWLEDGE, - MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null); + this(qpidConnection, con, channelId, false, Session.AUTO_ACKNOWLEDGE, + MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow, null); + } + + public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, + int ackMode, int defaultPrefetchHigh, int defaultPrefetchLow) + { + this(qpidConnection, con, channelId, false, ackMode, MessageFactoryRegistry.newDefaultRegistry(), + defaultPrefetchHigh, defaultPrefetchLow, null); + + } + + public XASessionImpl(org.apache.qpid.transport.Connection qpidConnection, AMQConnection con, int channelId, + boolean transacted, int ackMode, MessageFactoryRegistry registry, int defaultPrefetchHigh, int defaultPrefetchLow, + String name) + { + super(qpidConnection, con, channelId, transacted, ackMode, registry, defaultPrefetchHigh, defaultPrefetchLow, name); createSession(); _xaResource = new XAResourceImpl(this); - } + } + - //-- public methods + // public methods /** * Create a qpid session. @@ -70,7 +85,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic } - //--- javax.njms.XASEssion API + // javax.njms.XASEssion API /** * Gets the session associated with this XASession. @@ -97,7 +112,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic return _xaResource; } - //-- overwritten mehtods + // overwritten mehtods /** * Throws a {@link TransactionInProgressException}, since it should * not be called for an XASession object. @@ -132,7 +147,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic return _qpidDtxSession; } - //--- interface XAQueueSession + // interface XAQueueSession /** * Gets the topic session associated with this <CODE>XATopicSession</CODE>. * @@ -144,7 +159,7 @@ public class XASessionImpl extends AMQSession_0_10 implements XASession, XATopic return (QueueSession) getSession(); } - //--- interface XATopicSession + // interface XATopicSession /** * Gets the topic session associated with this <CODE>XATopicSession</CODE>. |