diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-11-21 11:31:30 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-11-21 11:31:30 +0000 |
| commit | 5ea8ab45cbd0181bc1acef84f4f1932e21c096cf (patch) | |
| tree | 14a24303b50540d463d74f2915ecb662e5d496bf /qpid/java | |
| parent | 057b915ee553d3c2843d5178c226454b64d81ef0 (diff) | |
| download | qpid-python-5ea8ab45cbd0181bc1acef84f4f1932e21c096cf.tar.gz | |
QPID-5344 : Deadlock in JMS AMQP 1.0 client - patch from David Ingham
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1544129 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
2 files changed, 109 insertions, 97 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java index 20454ace65..976ae10c56 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java @@ -175,43 +175,48 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect connect(); started = true; } + } - try + try + { + SessionImpl session = new SessionImpl(this, acknowledgeMode); + session.setQueueSession(_isQueueConnection); + session.setTopicSession(_isTopicConnection); + + boolean connectionStarted = false; + synchronized(_lock) { - SessionImpl session = new SessionImpl(this, acknowledgeMode); - session.setQueueSession(_isQueueConnection); - session.setTopicSession(_isTopicConnection); + checkClosed(); _sessions.add(session); - - if(_state == State.STARTED) - { - session.start(); - } - - return session; + connectionStarted = _state == State.STARTED; } - catch(JMSException e) + + if(connectionStarted) { - Error remoteError; - if(started - && e.getLinkedException() instanceof ConnectionErrorException - && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT) - { - String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host")); - int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port")); - String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname")); - reconnect(networkHost,port,hostName); - return createSession(acknowledgeMode); - - } - else - { - throw e; - } + session.start(); } + + return session; } + catch(JMSException e) + { + Error remoteError; + if(started + && e.getLinkedException() instanceof ConnectionErrorException + && (remoteError = ((ConnectionErrorException)e.getLinkedException()).getRemoteError()).getCondition() == ConnectionError.REDIRECT) + { + String networkHost = (String) remoteError.getInfo().get(Symbol.valueOf("network-host")); + int port = (Integer) remoteError.getInfo().get(Symbol.valueOf("port")); + String hostName = (String) remoteError.getInfo().get(Symbol.valueOf("hostname")); + reconnect(networkHost,port,hostName); + return createSession(acknowledgeMode); - + } + else + { + throw e; + } + } } void removeSession(SessionImpl session) @@ -272,6 +277,7 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public void start() throws JMSException { + List<SessionImpl> stoppedSessions = null; synchronized(_lock) { checkClosed(); @@ -281,30 +287,30 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect // TODO _state = State.STARTED; - - for(SessionImpl session : _sessions) - { - session.start(); - } - + stoppedSessions = new ArrayList<SessionImpl>(_sessions); } _lock.notifyAll(); } + if (stoppedSessions != null) + { + for(SessionImpl session : stoppedSessions) + { + session.start(); + } + } } public void stop() throws JMSException { + List<SessionImpl> startedSessions = null; synchronized(_lock) { switch(_state) { case STARTED: - for(SessionImpl session : _sessions) - { - session.stop(); - } + startedSessions = new ArrayList<SessionImpl>(_sessions); case UNCONNECTED: _state = State.STOPPED; break; @@ -314,6 +320,14 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect _lock.notifyAll(); } + + if (startedSessions != null) + { + for(SessionImpl session : startedSessions) + { + session.stop(); + } + } } @@ -341,39 +355,34 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect public void close() throws JMSException { - Object outerLock; - if(_conn != null) + List<SessionImpl> sessions = null; + List<CloseTask> closeTasks = null; + boolean closeConnection = false; + synchronized(_lock) { - outerLock = _conn.getEndpoint().getLock(); + if(_state != State.CLOSED) + { + _state = State.CLOSED; + sessions = new ArrayList<SessionImpl>(_sessions); + closeTasks = new ArrayList<CloseTask>(_closeTasks); + closeConnection = _conn != null && _state != State.UNCONNECTED; + } + + _lock.notifyAll(); } - else + + if (sessions != null) { - outerLock = _lock; - } - - synchronized (outerLock) - { - synchronized(_lock) + for(SessionImpl session : sessions) { - if(_state != State.CLOSED) - { - stop(); - List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions); - for(SessionImpl session : sessions) - { - session.close(); - } - for(CloseTask task : _closeTasks) - { - task.onClose(); - } - if(_conn != null && _state != State.UNCONNECTED ) { - _conn.close(); - } - _state = State.CLOSED; - } - - _lock.notifyAll(); + session.close(); + } + for(CloseTask task : closeTasks) + { + task.onClose(); + } + if(closeConnection) { + _conn.close(); } } } diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index 5b9a67503b..ce1ce512a2 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -114,35 +114,38 @@ public class Session } - public synchronized SendingLinkEndpoint createSendingLinkEndpoint(final String linkName, - final Target target, - final Source source, - AcknowledgeMode mode, - Map<Binary, Outcome> unsettled, - final DeliveryStateHandler deliveryStateHandler) - { - SendingLinkEndpoint link = this.getEndpoint().createSendingLinkEndpoint(linkName, source, target, - unsettled, deliveryStateHandler); - - switch(mode) - { - case ALO: - link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - link.setReceivingSettlementMode(ReceiverSettleMode.FIRST); - break; - case AMO: - link.setSendingSettlementMode(SenderSettleMode.SETTLED); - break; - case EO: - link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); - link.setReceivingSettlementMode(ReceiverSettleMode.SECOND); - break; - - } - - link.attach(); - - return link; + public SendingLinkEndpoint createSendingLinkEndpoint(final String linkName, + final Target target, + final Source source, + AcknowledgeMode mode, + Map<Binary, Outcome> unsettled, + final DeliveryStateHandler deliveryStateHandler) + { + SessionEndpoint endpoint = this.getEndpoint(); + synchronized(endpoint.getLock()) + { + SendingLinkEndpoint link = endpoint.createSendingLinkEndpoint(linkName, source, target, + unsettled, deliveryStateHandler); + + switch(mode) + { + case ALO: + link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + link.setReceivingSettlementMode(ReceiverSettleMode.FIRST); + break; + case AMO: + link.setSendingSettlementMode(SenderSettleMode.SETTLED); + break; + case EO: + link.setSendingSettlementMode(SenderSettleMode.UNSETTLED); + link.setReceivingSettlementMode(ReceiverSettleMode.SECOND); + break; + + } + + link.attach(); + return link; + } } public Receiver createReceiver(final String sourceAddr) throws ConnectionErrorException |
