From d9514d45f11c92aef06b8b880e291f76fdbff2a2 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Thu, 11 Dec 2014 17:14:34 +0000 Subject: Extend credit managers to be aware of transport backpressue git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644704 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/flow/BytesOnlyCreditManager.java | 87 ---------- .../apache/qpid/server/flow/FlowCreditManager.java | 4 - .../qpid/server/flow/LimitlessCreditManager.java | 53 ------ .../server/flow/MessageAndBytesCreditManager.java | 90 ---------- .../qpid/server/flow/MessageOnlyCreditManager.java | 86 --------- .../qpid/server/flow/Pre0_10CreditManager.java | 192 --------------------- .../qpid/server/protocol/AMQSessionModel.java | 2 + .../protocol/MultiVersionProtocolEngine.java | 34 ++++ .../apache/qpid/server/consumer/MockConsumer.java | 8 + .../server/protocol/v0_10/ConsumerTarget_0_10.java | 8 +- .../server/protocol/v0_10/CreditCreditManager.java | 40 ++--- .../protocol/v0_10/ProtocolEngineCreator_0_10.java | 5 +- .../server/protocol/v0_10/ProtocolEngine_0_10.java | 15 ++ .../server/protocol/v0_10/ServerConnection.java | 21 +++ .../qpid/server/protocol/v0_10/ServerSession.java | 10 ++ .../protocol/v0_10/ServerSessionDelegate.java | 5 +- .../server/protocol/v0_10/WindowCreditManager.java | 38 ++-- .../protocol/v0_10/WindowCreditManagerTest.java | 12 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 20 ++- .../server/protocol/v0_8/AMQProtocolEngine.java | 17 ++ .../server/protocol/v0_8/ConsumerTarget_0_8.java | 17 -- .../protocol/v0_8/MessageOnlyCreditManager.java | 73 ++++++++ .../server/protocol/v0_8/NoAckCreditManager.java | 56 ++++++ .../server/protocol/v0_8/Pre0_10CreditManager.java | 190 ++++++++++++++++++++ .../apache/qpid/server/protocol/v0_8/AckTest.java | 4 +- .../protocol/v0_8/LimitlessCreditManager.java | 47 +++++ .../qpid/server/protocol/v1_0/Connection_1_0.java | 20 ++- .../server/protocol/v1_0/ConsumerTarget_1_0.java | 10 +- .../protocol/v1_0/ProtocolEngine_1_0_0_SASL.java | 19 +- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 5 + .../qpid/server/protocol/v1_0/Session_1_0.java | 21 ++- .../apache/qpid/protocol/ServerProtocolEngine.java | 4 + 32 files changed, 611 insertions(+), 602 deletions(-) delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java create mode 100644 qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java create mode 100644 qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java create mode 100644 qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java create mode 100644 qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java deleted file mode 100644 index be3a13d2d3..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java +++ /dev/null @@ -1,87 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -import java.util.concurrent.atomic.AtomicLong; - -public class BytesOnlyCreditManager extends AbstractFlowCreditManager -{ - private final AtomicLong _bytesCredit; - - public BytesOnlyCreditManager(long initialCredit) - { - _bytesCredit = new AtomicLong(initialCredit); - } - - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return _bytesCredit.get(); - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - _bytesCredit.addAndGet(bytesCredit); - setSuspended(false); - } - - public void removeAllCredit() - { - _bytesCredit.set(0L); - } - - public boolean hasCredit() - { - return _bytesCredit.get() > 0L; - } - - public boolean useCreditForMessage(long msgSize) - { - if(hasCredit()) - { - if(_bytesCredit.addAndGet(-msgSize) >= 0) - { - return true; - } - else - { - _bytesCredit.addAndGet(msgSize); - setSuspended(true); - return false; - } - } - else - { - return false; - } - - } - - public void setBytesCredit(long bytesCredit) - { - _bytesCredit.set( bytesCredit ); - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java index 280f2851a4..08aac0b511 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java @@ -24,10 +24,6 @@ package org.apache.qpid.server.flow; public interface FlowCreditManager { - long getMessageCredit(); - - long getBytesCredit(); - public static interface FlowCreditManagerListener { void creditStateChanged(boolean hasCredit); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java deleted file mode 100644 index 89fc60666b..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java +++ /dev/null @@ -1,53 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - public long getMessageCredit() - { - return -1L; - } - - public long getBytesCredit() - { - return -1L; - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - } - - public void removeAllCredit() - { - } - - public boolean hasCredit() - { - return true; - } - - public boolean useCreditForMessage(long msgSize) - { - return true; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java deleted file mode 100644 index 31c1fda968..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java +++ /dev/null @@ -1,90 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - private long _messageCredit; - private long _bytesCredit; - - public MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit) - { - _messageCredit = messageCredit; - _bytesCredit = bytesCredit; - } - - public synchronized long getMessageCredit() - { - return _messageCredit; - } - - public synchronized long getBytesCredit() - { - return _bytesCredit; - } - - public synchronized void restoreCredit(long messageCredit, long bytesCredit) - { - _messageCredit += messageCredit; - _bytesCredit += bytesCredit; - setSuspended(hasCredit()); - } - - public synchronized void removeAllCredit() - { - _messageCredit = 0L; - _bytesCredit = 0L; - setSuspended(true); - } - - public synchronized boolean hasCredit() - { - return (_messageCredit > 0L) && ( _bytesCredit > 0L ); - } - - public synchronized boolean useCreditForMessage(final long msgSize) - { - if(_messageCredit == 0L) - { - setSuspended(true); - return false; - } - else - { - if(msgSize > _bytesCredit) - { - setSuspended(true); - return false; - } - _messageCredit--; - _bytesCredit -= msgSize; - setSuspended(false); - return true; - } - - } - - public synchronized void setBytesCredit(long bytesCredit) - { - _bytesCredit = bytesCredit; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java deleted file mode 100644 index 1817e8ad31..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java +++ /dev/null @@ -1,86 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - -import java.util.concurrent.atomic.AtomicLong; - -public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - private final AtomicLong _messageCredit; - - public MessageOnlyCreditManager(final long initialCredit) - { - _messageCredit = new AtomicLong(initialCredit); - } - - public long getMessageCredit() - { - return _messageCredit.get(); - } - - public long getBytesCredit() - { - return -1L; - } - - public void restoreCredit(long messageCredit, long bytesCredit) - { - _messageCredit.addAndGet(messageCredit); - setSuspended(false); - - } - - public void removeAllCredit() - { - setSuspended(true); - _messageCredit.set(0L); - } - - public boolean hasCredit() - { - return _messageCredit.get() > 0L; - } - - public boolean useCreditForMessage(long msgSize) - { - if(hasCredit()) - { - if(_messageCredit.addAndGet(-1L) >= 0) - { - setSuspended(false); - return true; - } - else - { - _messageCredit.addAndGet(1L); - setSuspended(true); - return false; - } - } - else - { - setSuspended(true); - return false; - } - - } - -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java deleted file mode 100644 index fc2d4bfb53..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java +++ /dev/null @@ -1,192 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.flow; - - -public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager -{ - - private volatile long _bytesCreditLimit; - private volatile long _messageCreditLimit; - - private volatile long _bytesCredit; - private volatile long _messageCredit; - - public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit) - { - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - _bytesCredit = bytesCreditLimit; - _messageCredit = messageCreditLimit; - } - - - public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) - { - long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit; - long messageCreditChange = messageCreditLimit - _messageCreditLimit; - - - - if(bytesCreditChange != 0L) - { - if(bytesCreditLimit == 0L) - { - _bytesCredit = 0; - } - else - { - _bytesCredit += bytesCreditChange; - } - } - - - if(messageCreditChange != 0L) - { - if(messageCreditLimit == 0L) - { - _messageCredit = 0; - } - else - { - _messageCredit += messageCreditChange; - } - } - - - _bytesCreditLimit = bytesCreditLimit; - _messageCreditLimit = messageCreditLimit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit; - } - - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) - { - final long messageCreditLimit = _messageCreditLimit; - boolean notifyIncrease = true; - if(messageCreditLimit != 0L) - { - notifyIncrease = (_messageCredit != 0); - long newCredit = _messageCredit + messageCredit; - _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit; - } - - - final long bytesCreditLimit = _bytesCreditLimit; - if(bytesCreditLimit != 0L) - { - long newCredit = _bytesCredit + bytesCredit; - _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit; - if(notifyIncrease && bytesCredit>0) - { - notifyIncreaseBytesCredit(); - } - } - - - - setSuspended(!hasCredit()); - - } - - public synchronized void removeAllCredit() - { - _bytesCredit = 0L; - _messageCredit = 0L; - setSuspended(!hasCredit()); - } - - public synchronized boolean hasCredit() - { - return (_bytesCreditLimit == 0L || _bytesCredit > 0) - && (_messageCreditLimit == 0L || _messageCredit > 0); - } - - public synchronized boolean useCreditForMessage(final long msgSize) - { - if(_messageCreditLimit != 0L) - { - if(_messageCredit != 0L) - { - if(_bytesCreditLimit == 0L) - { - _messageCredit--; - - return true; - } - else - { - if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) - { - _messageCredit--; - _bytesCredit -= msgSize; - - return true; - } - else - { - return false; - } - } - } - else - { - setSuspended(true); - return false; - } - } - else - { - if(_bytesCreditLimit == 0L) - { - - return true; - } - else - { - if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) - { - _bytesCredit -= msgSize; - - return true; - } - else - { - return false; - } - } - - } - - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index f13af479ad..40aa1bbafd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -113,4 +113,6 @@ public interface AMQSessionModel, C extends AMQCo * @return the time of the last activity or 0 if not in a transaction */ long getTransactionUpdateTime(); + + void transportStateChanged(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index 6ea9f3600c..3c25e0934c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -157,6 +157,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return _delegate.isTransportBlockedForWriting(); + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _delegate.setTransportBlockedForWriting(blocked); + } + private static final int MINIMUM_REQUIRED_HEADER_BYTES = 8; public void setNetworkConnection(NetworkConnection network, Sender sender) @@ -268,6 +280,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine { return new Subject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } } private class SelfDelegateProtocolEngine implements ServerProtocolEngine @@ -408,6 +431,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine return _delegate.getSubject(); } + @Override + public boolean isTransportBlockedForWriting() + { + return false; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + } + public void exception(Throwable t) { _logger.error("Error establishing session", t); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 1c42d9b6fe..47ed224133 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -456,6 +456,12 @@ public class MockConsumer implements ConsumerTarget { return 0; } + + @Override + public void transportStateChanged() + { + + } } private static class MockConnectionModel implements AMQConnectionModel @@ -663,5 +669,7 @@ public class MockConsumer implements ConsumerTarget { } + + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 89d681111b..afa4fb8bc0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -158,6 +158,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC return _name; } + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + } public static class AddMessageDispositionListenerAction implements Runnable { @@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC switch(flowMode) { case CREDIT: - _creditManager = new CreditCreditManager(0l,0l); + _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; case WINDOW: - _creditManager = new WindowCreditManager(0l,0l); + _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine()); break; default: // this should never happen, as 0-10 is finalised and so the enum should never change diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index 8dddac9809..e670c1f88b 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,48 +21,27 @@ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.flow.AbstractFlowCreditManager; public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { + private final ServerProtocolEngine _serverProtocolEngine; private volatile long _bytesCredit; private volatile long _messageCredit; - public CreditCreditManager(long bytesCredit, long messageCredit) + public CreditCreditManager(long bytesCredit, long messageCredit, final ServerProtocolEngine serverProtocolEngine) { + _serverProtocolEngine = serverProtocolEngine; _bytesCredit = bytesCredit; _messageCredit = messageCredit; setSuspended(!hasCredit()); } - - public synchronized void setCreditLimits(final long bytesCredit, final long messageCredit) - { - _bytesCredit = bytesCredit; - _messageCredit = messageCredit; - - setSuspended(!hasCredit()); - - } - - - public long getMessageCredit() - { - return _messageCredit == -1L - ? Long.MAX_VALUE - : _messageCredit; - } - - public long getBytesCredit() - { - return _bytesCredit == -1L - ? Long.MAX_VALUE - : _bytesCredit; - } - public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) { + setSuspended(!hasCredit()); } @@ -107,12 +86,17 @@ public class CreditCreditManager extends AbstractFlowCreditManager implements Fl public synchronized boolean hasCredit() { // Note !=, if credit is < 0 that indicates infinite credit - return (_bytesCredit != 0L && _messageCredit != 0L); + return (_bytesCredit != 0L && _messageCredit != 0L && !_serverProtocolEngine.isTransportBlockedForWriting()); } public synchronized boolean useCreditForMessage(long msgSize) { - if(_messageCredit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCredit >= 0L) { if(_messageCredit > 0) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java index 30aecdb2d2..5c919252b8 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java @@ -86,7 +86,10 @@ public class ProtocolEngineCreator_0_10 implements ProtocolEngineCreator conn.setRemoteAddress(network.getRemoteAddress()); conn.setLocalAddress(network.getLocalAddress()); - return new ProtocolEngine_0_10( conn, network); + ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, network); + conn.setProtocolEngine(protocolEngine); + + return protocolEngine; } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 4adf472c5d..cb96870e74 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -54,6 +54,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _createTime = System.currentTimeMillis(); private long _lastReadTime = _createTime; private long _lastWriteTime = _createTime; + private volatile boolean _transportBlockedForWriting; public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) @@ -249,4 +250,18 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { return _connection.getAuthorizedSubject(); } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + } + } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8567be37f0..cbd569d036 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; @@ -90,6 +91,8 @@ public class ServerConnection extends Connection implements AMQConnectionModel broker, final AmqpPort port, @@ -189,6 +192,16 @@ public class ServerConnection extends Connection implements AMQConnectionModel getVirtualHost() { return _virtualHost; @@ -664,4 +677,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel _bytesUsed) - && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed); + && (_messageCreditLimit < 0L || _messageCreditLimit > _messageUsed) + && !_serverProtocolEngine.isTransportBlockedForWriting(); } public synchronized boolean useCreditForMessage(final long msgSize) { - if(_messageCreditLimit >= 0L) + if (_serverProtocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit >= 0L) { if(_messageUsed < _messageCreditLimit) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java index 1c4a694be6..b05edc5d04 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java @@ -20,17 +20,25 @@ */ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.server.protocol.v0_10.WindowCreditManager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.test.utils.QpidTestCase; public class WindowCreditManagerTest extends QpidTestCase { private WindowCreditManager _creditManager; + private ServerProtocolEngine _protocolEngine; protected void setUp() throws Exception { super.setUp(); - _creditManager = new WindowCreditManager(); + + _protocolEngine = mock(ServerProtocolEngine.class); + when(_protocolEngine.isTransportBlockedForWriting()).thenReturn(false); + + _creditManager = new WindowCreditManager(0l, 0l, _protocolEngine); } /** diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 7604662980..9a6059ccbf 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -66,8 +66,6 @@ import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -131,7 +129,8 @@ public class AMQChannel private final int _channelId; - private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); + private final Pre0_10CreditManager _creditManager; + private final FlowCreditManager _noAckCreditManager; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -213,6 +212,9 @@ public class AMQChannel public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { + _creditManager = new Pre0_10CreditManager(0l,0l, connection); + _noAckCreditManager = new NoAckCreditManager(connection); + _connection = connection; _channelId = channelId; @@ -699,7 +701,7 @@ public class AMQChannel if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _noAckCreditManager); } else if(acks) { @@ -709,7 +711,7 @@ public class AMQChannel } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _noAckCreditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -1644,6 +1646,7 @@ public class AMQChannel } } + public synchronized void block(AMQQueue queue) { if(_blockingEntities.add(queue)) @@ -1671,6 +1674,13 @@ public class AMQChannel } } + @Override + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + _noAckCreditManager.restoreCredit(0, 0); + } + @Override public Object getConnectionReference() { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 606649445d..cea9b0930f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -188,6 +188,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private int _currentMethodId; private int _binaryDataLimit; private long _maxMessageSize; + private volatile boolean _transportBlockedForWriting; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -250,6 +251,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _authorizedSubject; } + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + for(AMQChannel channel : _channelMap.values()) + { + channel.transportStateChanged(); + } + } + public void setNetworkConnection(NetworkConnection network) { setNetworkConnection(network, network.getSender()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 43982db2fd..d6642aef2e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -136,12 +136,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - } public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel, @@ -215,12 +209,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } - @Override - public boolean allocateCredit(ServerMessage msg) - { - return true; - } - private static final ServerTransaction.Action NOOP = new ServerTransaction.Action() { @@ -250,11 +238,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod); } - public boolean allocateCredit(ServerMessage msg) - { - return getCreditManager().useCreditForMessage(msg.getSize()); - } - } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java new file mode 100644 index 0000000000..af54c911dc --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java @@ -0,0 +1,73 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + +public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager +{ + private final AtomicLong _messageCredit; + + public MessageOnlyCreditManager(final long initialCredit) + { + _messageCredit = new AtomicLong(initialCredit); + } + + public void restoreCredit(long messageCredit, long bytesCredit) + { + _messageCredit.addAndGet(messageCredit); + setSuspended(false); + + } + + public boolean hasCredit() + { + return _messageCredit.get() > 0L; + } + + public boolean useCreditForMessage(long msgSize) + { + if(hasCredit()) + { + if(_messageCredit.addAndGet(-1L) >= 0) + { + setSuspended(false); + return true; + } + else + { + _messageCredit.addAndGet(1L); + setSuspended(true); + return false; + } + } + else + { + setSuspended(true); + return false; + } + + } + +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java new file mode 100644 index 0000000000..2d32617106 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.protocol.v0_8; + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; + +public class NoAckCreditManager extends AbstractFlowCreditManager +{ + private final ServerProtocolEngine _serverProtocolEngine; + + public NoAckCreditManager(ServerProtocolEngine serverProtocolEngine) + { + _serverProtocolEngine = serverProtocolEngine; + } + + @Override + public void restoreCredit(final long messageCredit, final long bytesCredit) + { + setSuspended(!hasCredit()); + } + + @Override + public boolean hasCredit() + { + return !_serverProtocolEngine.isTransportBlockedForWriting(); + } + + @Override + public boolean useCreditForMessage(final long msgSize) + { + if (!hasCredit()) + { + setSuspended(true); + return false; + } + return true; + } +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java new file mode 100644 index 0000000000..e63645ed09 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java @@ -0,0 +1,190 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + + +import org.apache.qpid.protocol.ServerProtocolEngine; +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + +public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager +{ + + private final ServerProtocolEngine _protocolEngine; + private volatile long _bytesCreditLimit; + private volatile long _messageCreditLimit; + + private volatile long _bytesCredit; + private volatile long _messageCredit; + + public Pre0_10CreditManager(long bytesCreditLimit, + long messageCreditLimit, + ServerProtocolEngine protocolEngine) + { + _protocolEngine = protocolEngine; + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + _bytesCredit = bytesCreditLimit; + _messageCredit = messageCreditLimit; + } + + + + public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit) + { + long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit; + long messageCreditChange = messageCreditLimit - _messageCreditLimit; + + + + if(bytesCreditChange != 0L) + { + if(bytesCreditLimit == 0L) + { + _bytesCredit = 0; + } + else + { + _bytesCredit += bytesCreditChange; + } + } + + + if(messageCreditChange != 0L) + { + if(messageCreditLimit == 0L) + { + _messageCredit = 0; + } + else + { + _messageCredit += messageCreditChange; + } + } + + + _bytesCreditLimit = bytesCreditLimit; + _messageCreditLimit = messageCreditLimit; + + setSuspended(!hasCredit()); + + } + + + public synchronized void restoreCredit(final long messageCredit, final long bytesCredit) + { + final long messageCreditLimit = _messageCreditLimit; + boolean notifyIncrease = true; + if(messageCreditLimit != 0L) + { + notifyIncrease = (_messageCredit != 0); + long newCredit = _messageCredit + messageCredit; + _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit; + } + + + final long bytesCreditLimit = _bytesCreditLimit; + if(bytesCreditLimit != 0L) + { + long newCredit = _bytesCredit + bytesCredit; + _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit; + if(notifyIncrease && bytesCredit>0) + { + notifyIncreaseBytesCredit(); + } + } + + + + setSuspended(!hasCredit()); + + } + + public synchronized boolean hasCredit() + { + return (_bytesCreditLimit == 0L || _bytesCredit > 0) + && (_messageCreditLimit == 0L || _messageCredit > 0) + && !_protocolEngine.isTransportBlockedForWriting(); + } + + public synchronized boolean useCreditForMessage(final long msgSize) + { + if (_protocolEngine.isTransportBlockedForWriting()) + { + setSuspended(true); + return false; + } + else if(_messageCreditLimit != 0L) + { + if(_messageCredit != 0L) + { + if(_bytesCreditLimit == 0L) + { + _messageCredit--; + + return true; + } + else + { + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) + { + _messageCredit--; + _bytesCredit -= msgSize; + + return true; + } + else + { + return false; + } + } + } + else + { + setSuspended(true); + return false; + } + } + else + { + if(_bytesCreditLimit == 0L) + { + + return true; + } + else + { + if((_bytesCredit >= msgSize) || (_bytesCredit == _bytesCreditLimit)) + { + _bytesCredit -= msgSize; + + return true; + } + else + { + return false; + } + } + + } + + } +} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 9326f16703..55fc865850 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -31,8 +31,6 @@ import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MessagePublishInfo; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.flow.LimitlessCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; @@ -328,7 +326,7 @@ public class AckTest extends QpidTestCase public void testMessageDequeueRestoresCreditTest() throws Exception { // Send 10 messages - Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); + Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1, _protocolEngine); _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java new file mode 100644 index 0000000000..c4c89ac24a --- /dev/null +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java @@ -0,0 +1,47 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v0_8; + + +import org.apache.qpid.server.flow.AbstractFlowCreditManager; +import org.apache.qpid.server.flow.FlowCreditManager; + +public class LimitlessCreditManager extends AbstractFlowCreditManager implements FlowCreditManager +{ + + public void restoreCredit(long messageCredit, long bytesCredit) + { + } + + public void removeAllCredit() + { + } + + public boolean hasCredit() + { + return true; + } + + public boolean useCreditForMessage(long msgSize) + { + return true; + } +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 8e24d55da0..b55bd03a91 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -44,6 +44,7 @@ import org.apache.qpid.amqp_1_0.type.transport.AmqpError; import org.apache.qpid.amqp_1_0.type.transport.End; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.model.Broker; @@ -64,6 +65,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort _port; private final Broker _broker; private final SubjectCreator _subjectCreator; + private final ServerProtocolEngine _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -101,12 +103,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private boolean _closedOnOpen; + public Connection_1_0(Broker broker, ConnectionEndpoint conn, long connectionId, AmqpPort port, - Transport transport, final SubjectCreator subjectCreator) + Transport transport, + final SubjectCreator subjectCreator, + final ServerProtocolEngine protocolEngine) { + _protocolEngine = protocolEngine; _broker = broker; _port = port; _transport = transport; @@ -363,6 +369,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod return _port; } + public ServerProtocolEngine getProtocolEngine() + { + return _protocolEngine; + } + @Override public Transport getTransport() { @@ -480,4 +491,11 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } + public void transportStateChanged() + { + for (Session_1_0 session : _sessions) + { + session.transportStateChanged(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index c5d9a5e35d..b5e1bdafbb 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; @@ -84,7 +85,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget public boolean isSuspended() { - return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend(); + return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; } @@ -290,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized (_link.getLock()) { - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend(); + + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting(); if(!hasCredit && getState() == State.ACTIVE) { suspend(); @@ -330,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget { synchronized(_link.getLock()) { - if(isSuspended() && getEndpoint() != null) + ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine(); + if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting()) { updateState(State.SUSPENDED, State.ACTIVE); _transactionId = _link.getTransactionId(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 3bbfaac466..b2783a2da2 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -118,6 +118,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private NetworkConnection _network; private Sender _sender; private Connection_1_0 _connection; + private volatile boolean _transportBlockedForWriting; static enum State { @@ -216,7 +217,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _endpoint.setProperties(serverProperties); _endpoint.setRemoteAddress(getRemoteAddress()); - _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport, subjectCreator, this); _endpoint.setConnectionEventListener(_connection); _endpoint.setFrameOutputHandler(this); @@ -529,6 +530,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + + public void close() { _sender.close(); @@ -559,4 +562,18 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut { return _lastWriteTime; } + + @Override + public boolean isTransportBlockedForWriting() + { + return _transportBlockedForWriting; + } + @Override + public void setTransportBlockedForWriting(final boolean blocked) + { + _transportBlockedForWriting = blocked; + _connection.transportStateChanged(); + + } + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 2cfe431979..f8e4853099 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -728,4 +728,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _consumer; } + + public ConsumerTarget_1_0 getConsumerTarget() + { + return _target; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 8d71f980e5..f5827a3766 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel> _consumers = new CopyOnWriteArrayList>(); + private final CopyOnWriteArrayList _sendingLinks = new CopyOnWriteArrayList<>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList _consumerListeners = new CopyOnWriteArrayList(); private Session _modelObject; @@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel) { Consumer modelConsumer = (Consumer) consumer; _consumers.add(modelConsumer); + _sendingLinks.add(link); modelConsumer.addChangeListener(_consumerClosedListener); consumerAdded(modelConsumer); } @@ -608,6 +611,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel