From 327b9217006cef5d9f0d4736ba1f55ea6e13ebe2 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 18 Apr 2007 22:07:01 +0000 Subject: added state support for distributed transactions git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/client_restructure@530180 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/nclient/amqp/qpid/QpidAMQPChannel.java | 8 +- .../nclient/amqp/qpid/QpidAMQPDtxDemarcation.java | 88 +++++++++++++++++++--- .../apache/qpid/nclient/amqp/state/AMQPState.java | 6 ++ 3 files changed, 86 insertions(+), 16 deletions(-) (limited to 'java/newclient') diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java index 928780353d..decb120796 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPChannel.java @@ -130,7 +130,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); _channelNotOpend.await(); - checkIfConnectionClosed(); + checkIfChannelClosed(); AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time"); notifyState(AMQPState.CHANNEL_OPENED); _currentState = AMQPState.CHANNEL_OPENED; @@ -198,7 +198,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS); _channelFlowNotResponded.await(); - checkIfConnectionClosed(); + checkIfChannelClosed(); AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time"); handleChannelFlowState(_channelFlowOkBody.active); return _channelFlowOkBody; @@ -228,7 +228,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS); _channelNotResumed.await(); - checkIfConnectionClosed(); + checkIfChannelClosed(); AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time"); notifyState(AMQPState.CHANNEL_OPENED); @@ -330,7 +330,7 @@ public class QpidAMQPChannel extends AMQPStateMachine implements AMQPMethodListe } } - private void checkIfConnectionClosed() throws AMQPException + private void checkIfChannelClosed() throws AMQPException { if (_channelCloseBody != null) { diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java index a418de7ff6..eb2cdb4d01 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPDtxDemarcation.java @@ -1,9 +1,11 @@ package org.apache.qpid.nclient.amqp.qpid; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.DtxDemarcationEndBody; import org.apache.qpid.framing.DtxDemarcationEndOkBody; import org.apache.qpid.framing.DtxDemarcationSelectBody; @@ -11,16 +13,24 @@ import org.apache.qpid.framing.DtxDemarcationSelectOkBody; import org.apache.qpid.framing.DtxDemarcationStartBody; import org.apache.qpid.framing.DtxDemarcationStartOkBody; import org.apache.qpid.nclient.amqp.AMQPDtxDemarcation; +import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent; +import org.apache.qpid.nclient.amqp.event.AMQPMethodListener; import org.apache.qpid.nclient.amqp.state.AMQPState; +import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent; +import org.apache.qpid.nclient.amqp.state.AMQPStateMachine; import org.apache.qpid.nclient.amqp.state.AMQPStateManager; +import org.apache.qpid.nclient.amqp.state.AMQPStateType; +import org.apache.qpid.nclient.config.ClientConfiguration; import org.apache.qpid.nclient.core.AMQPException; import org.apache.qpid.nclient.core.Phase; +import org.apache.qpid.nclient.core.QpidConstants; +import org.apache.qpid.nclient.util.AMQPValidator; -public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation +public class QpidAMQPDtxDemarcation extends AMQPStateMachine implements AMQPMethodListener, AMQPDtxDemarcation { private static final Logger _logger = Logger.getLogger(QpidAMQPDtxDemarcation.class); - // the channelId assigned for this channel + // the channelId that will be used for transactions private int _channelId; private Phase _phase; @@ -29,28 +39,62 @@ public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation private AMQPStateManager _stateManager; - private final AMQPState[] _validCloseStates = new AMQPState[] - { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND }; + private final AMQPState[] _validEndStates = new AMQPState[] + { AMQPState.DTX_STARTED }; - private final AMQPState[] _validResumeStates = new AMQPState[] - { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED }; + private final AMQPState[] _validStartStates = new AMQPState[] + { AMQPState.DTX_NOT_STARTED, AMQPState.DTX_END }; // The wait period until a server sends a respond private long _serverTimeOut = 1000; private final Lock _lock = new ReentrantLock(); + private final Condition _dtxNotSelected = _lock.newCondition(); + + private final Condition _channelNotClosed = _lock.newCondition(); - public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException + private DtxDemarcationSelectOkBody _dtxDemarcationSelectOkBody; + + protected QpidAMQPDtxDemarcation(int channelId, Phase phase, AMQPStateManager stateManager) { - // TODO Auto-generated method stub - return null; - } + _channelId = channelId; + _phase = phase; + _stateManager = stateManager; + _currentState = AMQPState.DTX_CHANNEL_NOT_SELECTED; + _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS); + } + /** + * ------------------------------------------- + * API Methods + * -------------------------------------------- + */ public DtxDemarcationSelectOkBody select(DtxDemarcationSelectBody dtxDemarcationSelectBody) throws AMQPException { - // TODO Auto-generated method stub - return null; + _lock.lock(); + try + { + _dtxDemarcationSelectOkBody = null; + checkIfValidStateTransition(AMQPState.DTX_CHANNEL_NOT_SELECTED, _currentState, AMQPState.DTX_NOT_STARTED); + AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, _dtxDemarcationSelectOkBody, QpidConstants.EMPTY_CORRELATION_ID); + _phase.messageSent(msg); + + //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS); + _dtxNotSelected.await(); + AMQPValidator.throwExceptionOnNull(_dtxDemarcationSelectOkBody, "The broker didn't send the DtxDemarcationSelectOkBody in time"); + notifyState(AMQPState.CHANNEL_OPENED); + _currentState = AMQPState.CHANNEL_OPENED; + return _dtxDemarcationSelectOkBody; + } + catch (Exception e) + { + throw new AMQPException("Error in dtx.select", e); + } + finally + { + _lock.unlock(); + } } public DtxDemarcationStartOkBody start(DtxDemarcationStartBody dtxDemarcationStartBody) throws AMQPException @@ -58,5 +102,25 @@ public class QpidAMQPDtxDemarcation implements AMQPDtxDemarcation // TODO Auto-generated method stub return null; } + + public DtxDemarcationEndOkBody end(DtxDemarcationEndBody dtxDemarcationEndBody) throws AMQPException + { + // TODO Auto-generated method stub + return null; + } + + /** + * ------------------------------------------- + * AMQPMethodListener methods + * -------------------------------------------- + */ + public boolean methodReceived(AMQPMethodEvent evt) throws AMQPException + { + return true; + } + private void notifyState(AMQPState newState) throws AMQPException + { + _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE)); + } } diff --git a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java index 061ec5a849..18219abc46 100644 --- a/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java +++ b/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java @@ -57,4 +57,10 @@ public class AMQPState public static final AMQPState CHANNEL_OPENED = new AMQPState(11, "CHANNEL_OPENED"); public static final AMQPState CHANNEL_CLOSED = new AMQPState(11, "CHANNEL_CLOSED"); public static final AMQPState CHANNEL_SUSPEND = new AMQPState(11, "CHANNEL_SUSPEND"); + + // Distributed Transaction state + public static final AMQPState DTX_CHANNEL_NOT_SELECTED = new AMQPState(10, "DTX_CHANNEL_NOT_SELECTED"); + public static final AMQPState DTX_NOT_STARTED = new AMQPState(10, "DTX_NOT_STARTED"); + public static final AMQPState DTX_STARTED = new AMQPState(10, "DTX_STARTED"); + public static final AMQPState DTX_END = new AMQPState(10, "DTX_END"); } -- cgit v1.2.1