diff options
| author | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-02-10 18:10:16 +0000 |
| commit | f5ee46517eb096030a6c44b14b801eb2aaeb9392 (patch) | |
| tree | 25544486642cc770061489663dba650d85769404 /qpid/java/broker-plugins | |
| parent | 085486ebe5ff21133b9caf1c31625ac6ea356568 (diff) | |
| download | qpid-python-f5ee46517eb096030a6c44b14b801eb2aaeb9392.tar.gz | |
Refactoring: make the queue no longer be responsible for pushing messages onto the wire
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1658773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
12 files changed, 181 insertions, 40 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index afa4fb8bc0..209f6663ec 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension } @@ -195,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -346,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { recordUnacknowledged(entry); } - return size; } void recordUnacknowledged(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 401c6fc939..3e8ba7cfab 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -32,6 +32,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.Constant; import org.apache.qpid.transport.network.Assembler; @@ -55,6 +56,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _lastWriteTime = _createTime; private volatile boolean _transportBlockedForWriting; + private volatile boolean _messageAssignmentSuspended; + public ProtocolEngine_0_10(ServerConnection conn, NetworkConnection network) { @@ -67,6 +70,20 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } } + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + + public void setNetworkConnection(final NetworkConnection network, final ByteBufferSender sender) { if(!getSubject().equals(Subject.getSubject(AccessController.getContext()))) @@ -252,4 +269,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol _connection.transportStateChanged(); } + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : _connection.getSessionModels()) + { + session.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index cbd569d036..d9b4495d6e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -685,4 +685,17 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S ssn.transportStateChanged(); } } + + @Override + public void flushBatched() + { + getSender().flush(); + } + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _serverProtocolEngine.isMessageAssignmentSuspended(); + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 1d8676edd6..3659d6ce01 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -1135,6 +1135,15 @@ public class ServerSession extends Session } } + @Override + public void processPendingMessages() + { + for(ConsumerTarget target : getSubscriptions()) + { + target.processPendingMessages(); + } + } + public final long getMaxUncommittedInMemorySize() { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index a149214455..be28024d13 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -3606,4 +3606,14 @@ public class AMQChannel } } } + + @Override + public void processPendingMessages() + { + + for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) + { + target.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 233f68aeb6..1b69edb50e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -202,6 +202,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private long _maxMessageSize; private volatile boolean _transportBlockedForWriting; + private volatile boolean _messageAssignmentSuspended; + + + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + public AMQProtocolEngine(Broker<?> broker, final NetworkConnection network, final long connectionId, @@ -331,9 +347,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { final long arrivalTime = System.currentTimeMillis(); - if(!_authenticated && - (arrivalTime - _creationTime) > _port.getContextValue(Long.class, - Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) + if (!_authenticated && + (arrivalTime - _creationTime) > _port.getContextValue(Long.class, + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)) { _logger.warn("Connection has taken more than " + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) @@ -388,7 +404,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } catch (StoreException e) { - if(_virtualHost.getState() == State.ACTIVE) + if (_virtualHost.getState() == State.ACTIVE) { throw e; } @@ -1362,7 +1378,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getMethodRegistry(), - null)); + null)); } public void block() @@ -2049,4 +2065,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return _closing.get(); } + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : getSessionModels()) + { + session.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index d6642aef2e..d33a4aafd8 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v0_8; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +34,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerMessageInstancePair; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -99,6 +101,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return _consumers; } + static final class BrowserConsumer extends ConsumerTarget_0_8 { public BrowserConsumer(AMQChannel channel, @@ -123,7 +126,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @throws org.apache.qpid.AMQException */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -131,7 +134,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - return sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -178,7 +181,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -205,7 +208,6 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } ref.release(); - return size; } @@ -278,9 +280,10 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch) { + // put queue entry on a list and then notify the connection to read list. synchronized (getChannel()) { @@ -292,12 +295,15 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen entry.addStateChangeListener(getReleasedStateChangeListener()); long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - return size; } + + } + + } @@ -382,7 +388,8 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return subscriber + "]"; } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return getState()!=State.ACTIVE || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped(); } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index b55bd03a91..b6c23dff7a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -65,7 +65,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final AmqpPort<?> _port; private final Broker<?> _broker; private final SubjectCreator _subjectCreator; - private final ServerProtocolEngine _protocolEngine; + private final ProtocolEngine_1_0_0_SASL _protocolEngine; private VirtualHostImpl _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -110,7 +110,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod AmqpPort<?> port, Transport transport, final SubjectCreator subjectCreator, - final ServerProtocolEngine protocolEngine) + final ProtocolEngine_1_0_0_SASL protocolEngine) { _protocolEngine = protocolEngine; _broker = broker; @@ -498,4 +498,16 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod session.transportStateChanged(); } } + + @Override + public void flushBatched() + { + _protocolEngine.flushBatched(); + } + + @Override + public boolean isMessageAssignmentSuspended() + { + return _protocolEngine.isMessageAssignmentSuspended(); + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index a44768ffdc..589bd0ec04 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -83,7 +83,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget return _link.getEndpoint(); } - public boolean isSuspended() + @Override + public boolean doIsSuspended() { return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE; @@ -113,22 +114,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } } - public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch) + public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch) { // TODO - long size = entry.getMessage().getSize(); - send(entry); - return size; - } - - public void flushBatched() - { - // TODO - } - - public void send(final MessageInstance queueEntry) - { - ServerMessage serverMessage = queueEntry.getMessage(); + ServerMessage serverMessage = entry.getMessage(); Message_1_0 message; if(serverMessage instanceof Message_1_0) { @@ -168,7 +157,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget payload.flip(); } - if(queueEntry.getDeliveryCount() != 0) + if(entry.getDeliveryCount() != 0) { payload = payload.duplicate(); ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -200,7 +189,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget header.setPriority(oldHeader.getPriority()); header.setTtl(oldHeader.getTtl()); } - header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount())); + header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount())); _sectionEncoder.reset(); _sectionEncoder.encodeObject(header); Binary encodedHeader = _sectionEncoder.getEncoding(); @@ -230,10 +219,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget else { UnsettledAction action = _acquires - ? new DispositionAction(tag, queueEntry) - : new DoNothingAction(tag, queueEntry); + ? new DispositionAction(tag, entry) + : new DoNothingAction(tag, entry); - _link.addUnsettled(tag, action, queueEntry); + _link.addUnsettled(tag, action, entry); } if(_transactionId != null) @@ -257,9 +246,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget public void onRollback() { - if(queueEntry.isAcquiredBy(getConsumer())) + if(entry.isAcquiredBy(getConsumer())) { - queueEntry.release(); + entry.release(); _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); @@ -274,12 +263,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } else { - queueEntry.release(); + entry.release(); } } } + public void flushBatched() + { + // TODO + } + public void queueDeleted() { //TODO diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index 147ccd4edd..57f070804a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -56,6 +56,7 @@ import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -135,6 +136,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private State _state = State.A; + private volatile boolean _messageAssignmentSuspended; + + + public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker<?> broker, long id, AmqpPort<?> port, Transport transport) @@ -150,6 +155,19 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + @Override + public boolean isMessageAssignmentSuspended() + { + return _messageAssignmentSuspended; + } + + @Override + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + { + _messageAssignmentSuspended = messageAssignmentSuspended; + } + + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -576,4 +594,17 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } + public void flushBatched() + { + _sender.flush(); + } + + @Override + public void processPendingMessages() + { + for (AMQSessionModel session : _connection.getSessionModels()) + { + session.processPendingMessages(); + } + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 01c11b9eca..dd03469d0f 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -899,6 +899,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio return 0L; } + @Override + public void processPendingMessages() + { + for(Consumer<?> consumer : getConsumers()) + { + + ((ConsumerImpl)consumer).getTarget().processPendingMessages(); + } + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 3f873a24ff..c03dc4e1be 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -164,6 +164,12 @@ class ManagementNodeConsumer implements ConsumerImpl } + @Override + public ConsumerTarget getTarget() + { + return _target; + } + ManagementNode getManagementNode() { return _managementNode; |
