diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
| commit | c0e454cf882c7af8292832d6233940c56cc6a881 (patch) | |
| tree | 317fcecc377ef9899cbb3760dfbf600295d6beb7 /qpid/java/broker-plugins | |
| parent | a22fa634fe3a3f51d1a27078e17cba82e48fcf46 (diff) | |
| download | qpid-python-c0e454cf882c7af8292832d6233940c56cc6a881.tar.gz | |
QPID-6000 : [Java Broker] [Java Client] add the ability to configure automatic message compression
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1618375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
14 files changed, 286 insertions, 76 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index 7ab3fbb1f5..ec0c38ec42 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +52,7 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Option; +import org.apache.qpid.util.GZIPUtils; public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { @@ -198,7 +200,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final AddMessageDispositionListenerAction _postIdSettingAction; - public void send(final MessageInstance entry, boolean batch) + public long send(final MessageInstance entry, boolean batch) { ServerMessage serverMsg = entry.getMessage(); @@ -264,11 +266,44 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC deliveryProps.setRedelivered(entry.isRedelivered()); - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding()); + + ByteBuffer body = msg.getBody(); - xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody()); + boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported(); + + if(msgCompressed && !compressionSupported) + { + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(body); + if(uncompressed != null) + { + messageProps.setContentEncoding(null); + body = ByteBuffer.wrap(uncompressed); + } + } + else if(!msgCompressed + && compressionSupported + && (messageProps == null || messageProps.getContentEncoding()==null) + && body.remaining() > _session.getConnection().getMessageCompressionThreshold()) + { + byte[] compressed = GZIPUtils.compressBufferToArray(body); + if(compressed != null) + { + if(messageProps == null) + { + messageProps = new MessageProperties(); + } + messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING); + body = ByteBuffer.wrap(compressed); + } + } + long size = body == null ? 0 : body.remaining(); + + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); + + xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body, BATCHED) + : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header, body); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED) { @@ -311,7 +346,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC { recordUnacknowledged(entry); } - + return size; } void recordUnacknowledged(MessageInstance entry) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 8ddd04f51a..60bb5c6112 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -74,7 +74,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final long _connectionId; private final Object _reference = new Object(); - private VirtualHostImpl _virtualHost; + private VirtualHostImpl<?,?,?> _virtualHost; private Port<?> _port; private AtomicLong _lastIoTime = new AtomicLong(); private boolean _blocking; @@ -87,6 +87,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S new CopyOnWriteArrayList<SessionModelListener>(); private volatile boolean _stopped; + private int _messageCompressionThreshold; public ServerConnection(final long connectionId, Broker broker) { @@ -172,14 +173,22 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S super.setConnectionDelegate(delegate); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; } - public void setVirtualHost(VirtualHostImpl virtualHost) + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) { _virtualHost = virtualHost; + _messageCompressionThreshold = + virtualHost.getContextValue(Integer.class, + Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE); + + if(_messageCompressionThreshold <= 0) + { + _messageCompressionThreshold = Integer.MAX_VALUE; + } } @Override @@ -639,4 +648,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S { _taskList.remove(task); } + + public int getMessageCompressionThreshold() + { + return _messageCompressionThreshold; + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index bab2d802e8..cc9d66756b 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -64,6 +64,8 @@ public class ServerConnectionDelegate extends ServerDelegate private final SubjectCreator _subjectCreator; private int _maximumFrameSize; + private boolean _compressionSupported; + public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator) { this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator); @@ -111,6 +113,7 @@ public class ServerConnectionDelegate extends ServerDelegate map.put(ServerPropertyNames.VERSION, QpidProperties.getReleaseVersion()); map.put(ServerPropertyNames.QPID_BUILD, QpidProperties.getBuildVersion()); map.put(ServerPropertyNames.QPID_INSTANCE_NAME, broker.getName()); + map.put(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, String.valueOf(broker.isMessageCompressionEnabled())); return map; } @@ -366,6 +369,16 @@ public class ServerConnectionDelegate extends ServerDelegate public void connectionStartOk(Connection conn, ConnectionStartOk ok) { _clientProperties = ok.getClientProperties(); + if(_clientProperties != null) + { + Object compressionSupported = + _clientProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED); + if (compressionSupported != null) + { + _compressionSupported = Boolean.parseBoolean(String.valueOf(compressionSupported)); + + } + } super.connectionStartOk(conn, ok); } @@ -400,4 +413,9 @@ public class ServerConnectionDelegate extends ServerDelegate int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY); return delay == 0 ? super.getHeartbeatMax() : delay; } + + public boolean isCompressionSupported() + { + return _compressionSupported && _broker.isMessageCompressionEnabled(); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 1c264e52c6..c193491e1e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -103,7 +103,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private String _clientProduct = null; private String _remoteProcessPid = null; - private VirtualHostImpl _virtualHost; + private VirtualHostImpl<?,?,?> _virtualHost; private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); @@ -175,6 +175,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private volatile boolean _stopped; private long _readBytes; private boolean _authenticated; + private boolean _compressionSupported; + private int _messageCompressionThreshold; public AMQProtocolEngine(Broker broker, final NetworkConnection network, @@ -208,7 +210,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return null; } }); - + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); @@ -539,6 +541,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _broker.getName()); serverProperties.setString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, String.valueOf(_closeWhenNoRoute)); + serverProperties.setString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED, + String.valueOf(_broker.isMessageCompressionEnabled())); AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(), (short) pv.getActualMinorVersion(), @@ -1131,6 +1135,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _logger.debug("Client set closeWhenNoRoute=" + _closeWhenNoRoute + " for protocol engine " + this); } } + String compressionSupported = clientProperties.getString(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED); + if (compressionSupported != null) + { + _compressionSupported = Boolean.parseBoolean(compressionSupported); + if(_logger.isDebugEnabled()) + { + _logger.debug("Client set compressionSupported=" + _compressionSupported + " for protocol engine " + this); + } + } _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8); _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT); @@ -1181,17 +1194,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getMethodRegistry(); } - public VirtualHostImpl getVirtualHost() + public VirtualHostImpl<?,?,?> getVirtualHost() { return _virtualHost; } - public void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException + public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException { _virtualHost = virtualHost; _virtualHost.getConnectionRegistry().registerConnection(this); + + _messageCompressionThreshold = virtualHost.getContextValue(Integer.class, + Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE); + if(_messageCompressionThreshold <= 0) + { + _messageCompressionThreshold = Integer.MAX_VALUE; + } } public void addDeleteTask(Action<? super AMQProtocolEngine> task) @@ -1595,15 +1615,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { - registerMessageDelivered(message.getSize()); - _protocolOutputConverter.writeDeliver(message, + long size = _protocolOutputConverter.writeDeliver(message, props, _channelId, deliveryTag, new AMQShortString(sub.getName())); + registerMessageDelivered(size); + return size; } } @@ -1636,6 +1657,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return _closeWhenNoRoute; } + @Override + public boolean isCompressionSupported() + { + return _compressionSupported && _broker.isMessageCompressionEnabled(); + } + + @Override + public int getMessageCompressionThreshold() + { + return _messageCompressionThreshold; + } + public EventLogger getEventLogger() { if(_virtualHost != null) diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index bab0aaf3da..8d5142338a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -174,9 +174,9 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> Object getReference(); - VirtualHostImpl getVirtualHost(); + VirtualHostImpl<?,?,?> getVirtualHost(); - void setVirtualHost(VirtualHostImpl virtualHost) throws AMQException; + void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException; public ProtocolOutputConverter getProtocolOutputConverter(); @@ -210,4 +210,8 @@ public interface AMQProtocolSession<T extends AMQProtocolSession<T>> * can't be routed rather than returning the message. */ boolean isCloseWhenNoRoute(); + + boolean isCompressionSupported(); + + int getMessageCompressionThreshold(); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index fa26a73f93..c7871e8b9a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -26,6 +26,6 @@ import org.apache.qpid.server.message.ServerMessage; public interface ClientDeliveryMethod { - void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, + long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 7c2efe64e6..d5eed242e7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -116,7 +116,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @throws org.apache.qpid.AMQException */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // We don't decrement the reference here as we don't want to consume the message // but we do want to send it to the client. @@ -124,7 +124,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen synchronized (getChannel()) { long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + return sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); } } @@ -177,7 +177,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // if we do not need to wait for client acknowledgements // we can decrement the reference count immediately. @@ -194,17 +194,17 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen MessageReference ref = message.newReference(); InstanceProperties props = entry.getInstanceProperties(); entry.delete(); - + long size; synchronized (getChannel()) { getChannel().getProtocolSession().setDeferFlush(batch); long deliveryTag = getChannel().getNextDeliveryTag(); - sendToClient(message, props, deliveryTag); + size = sendToClient(message, props, deliveryTag); } ref.release(); - + return size; } @@ -291,7 +291,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen * @param batch */ @Override - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { @@ -303,9 +303,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen addUnacknowledgedMessage(entry); recordMessageDelivery(entry, deliveryTag); entry.addStateChangeListener(getReleasedStateChangeListener()); - sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); + long size = sendToClient(entry.getMessage(), entry.getInstanceProperties(), deliveryTag); entry.incrementDeliveryCount(); - + return size; } } @@ -502,9 +502,9 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) + protected long sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag) { - _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); + return _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index 0026bad063..c3bdedf44d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; +import java.util.EnumSet; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -30,26 +33,23 @@ import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.flow.FlowCreditManager; +import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; +import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8; -import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; -import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.security.AccessControlException; -import java.util.EnumSet; - public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody> { private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class); @@ -202,17 +202,18 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } @Override - public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, + public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { _singleMessageCredit.useCreditForMessage(message.getSize()); - _session.getProtocolOutputConverter().writeGetOk(message, + long size =_session.getProtocolOutputConverter().writeGetOk(message, props, _channel.getChannelId(), deliveryTag, _queue.getQueueDepthMessages()); _deliveredMessage = true; + return size; } public boolean hasDeliveredMessage() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java index 7678ce812b..4ee5cbc17d 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverter.java @@ -26,7 +26,6 @@ */ package org.apache.qpid.server.protocol.v0_8.output; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; @@ -35,7 +34,6 @@ import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.queue.QueueEntry; public interface ProtocolOutputConverter { @@ -46,12 +44,12 @@ public interface ProtocolOutputConverter ProtocolOutputConverter newInstance(AMQProtocolSession session); } - void writeDeliver(final ServerMessage msg, + long writeDeliver(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag); - void writeGetOk(final ServerMessage msg, + long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java index f786cb113a..9e41f7884c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8.output; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; @@ -27,6 +31,7 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicCancelOkBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicGetOkBody; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -34,16 +39,13 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; - -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; +import org.apache.qpid.util.GZIPUtils; class ProtocolOutputConverterImpl implements ProtocolOutputConverter { @@ -51,6 +53,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter private final MethodRegistry _methodRegistry; private final AMQProtocolSession _protocolSession; + private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING); ProtocolOutputConverterImpl(AMQProtocolSession session, MethodRegistry methodRegistry) { @@ -64,7 +67,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _protocolSession; } - public void writeDeliver(final ServerMessage m, + public long writeDeliver(final ServerMessage m, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag) @@ -72,7 +75,7 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQMessage msg = convertToAMQMessage(m); final boolean isRedelivered = Boolean.TRUE.equals(props.getProperty(InstanceProperties.Property.REDELIVERED)); AMQBody deliverBody = createEncodedDeliverBody(msg, isRedelivered, deliveryTag, consumerTag); - writeMessageDelivery(msg, channelId, deliverBody); + return writeMessageDelivery(msg, channelId, deliverBody); } private AMQMessage convertToAMQMessage(ServerMessage serverMessage) @@ -93,21 +96,97 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter return MessageConverterRegistry.getConverter(clazz, AMQMessage.class); } - private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody) + private long writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody) { - writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); + return writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); } - private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) + private long writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) { - int bodySize = (int) message.getSize(); + boolean msgCompressed = isCompressed(contentHeaderBody); + byte[] modifiedContent; + + // straight through case + boolean compressionSupported = _protocolSession.isCompressionSupported(); + + if(msgCompressed && !compressionSupported && + (modifiedContent = GZIPUtils.uncompressBufferToArray(message.getContent(0,bodySize))) != null) + { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding((String)null); + + writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent); + + return modifiedContent.length; + } + else if(!msgCompressed + && compressionSupported + && contentHeaderBody.getProperties().getEncoding()==null + && bodySize > _protocolSession.getMessageCompressionThreshold() + && (modifiedContent = GZIPUtils.compressBufferToArray(message.getContent(0, bodySize))) != null) + { + BasicContentHeaderProperties modifiedProps = + new BasicContentHeaderProperties(contentHeaderBody.getProperties()); + modifiedProps.setEncoding(GZIP_ENCODING); + + writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent); + + return modifiedContent.length; + } + else + { + writeMessageDeliveryUnchanged(message, contentHeaderBody, channelId, deliverBody, bodySize); + + return bodySize; + } + } - if(bodySize == 0) + private int writeMessageDeliveryModified(final int channelId, + final AMQBody deliverBody, + final BasicContentHeaderProperties modifiedProps, + final byte[] content) + { + final int bodySize; + bodySize = content.length; + ContentHeaderBody modifiedHeaderBody = + new ContentHeaderBody(BASIC_CLASS_ID, 0, modifiedProps, bodySize); + final MessageContentSource wrappedSource = new MessageContentSource() + { + @Override + public int getContent(final ByteBuffer buf, final int offset) + { + int size = Math.min(buf.remaining(), content.length - offset); + buf.put(content, offset, size); + return size; + } + + @Override + public ByteBuffer getContent(final int offset, final int size) + { + return ByteBuffer.wrap(content, offset, size); + } + + @Override + public long getSize() + { + return content.length; + } + }; + writeMessageDeliveryUnchanged(wrappedSource, modifiedHeaderBody, channelId, deliverBody, bodySize); + return bodySize; + } + + private void writeMessageDeliveryUnchanged(final MessageContentSource message, + final ContentHeaderBody contentHeaderBody, + final int channelId, final AMQBody deliverBody, final int bodySize) + { + if (bodySize == 0) { SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody, - contentHeaderBody); + contentHeaderBody); writeFrame(compositeBlock); } @@ -120,13 +199,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter int writtenSize = capacity; - AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity); + AMQBody firstContentBody = new MessageContentSourceBody(message, 0, capacity); CompositeAMQBodyBlock - compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); + compositeBlock = + new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody); writeFrame(compositeBlock); - while(writtenSize < bodySize) + while (writtenSize < bodySize) { capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize; MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity); @@ -137,6 +217,11 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } + private boolean isCompressed(final ContentHeaderBody contentHeaderBody) + { + return GZIP_ENCODING.equals(contentHeaderBody.getProperties().getEncoding()); + } + private class MessageContentSourceBody implements AMQBody { public static final byte TYPE = 3; @@ -186,14 +271,14 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter } } - public void writeGetOk(final ServerMessage msg, + public long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, int queueSize) { AMQBody deliver = createEncodedGetOkBody(msg, props, deliveryTag, queueSize); - writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); + return writeMessageDelivery(convertToAMQMessage(msg), channelId, deliver); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 7f4a3701cd..05ae5285ad 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -141,13 +141,13 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr { } - public void writeDeliver(final ServerMessage msg, + public long writeDeliver(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, AMQShortString consumerTag) { _deliveryCount.incrementAndGet(); - + long size = msg.getSize(); synchronized (_channelDelivers) { Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId); @@ -168,14 +168,16 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumerDelivers.add(new DeliveryPair(deliveryTag, msg)); } + return size; } - public void writeGetOk(final ServerMessage msg, + public long writeGetOk(final ServerMessage msg, final InstanceProperties props, int channelId, long deliveryTag, int queueSize) { + return msg.getSize(); } public void awaitDelivery(int msgs) @@ -244,11 +246,11 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(ConsumerImpl sub, ServerMessage message, + public long deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) { _deliveryCount.incrementAndGet(); - + long size = message.getSize(); synchronized (_channelDelivers) { Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId); @@ -269,6 +271,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr consumerDelivers.add(new DeliveryPair(deliveryTag, message)); } + return size; } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index bceae85896..918a890af5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -112,10 +112,12 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } } - public void send(MessageInstance entry, boolean batch) + public long send(MessageInstance entry, boolean batch) { // TODO + long size = entry.getMessage().getSize(); send(entry); + return size; } public void flushBatched() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 5b9bdc7244..3572b98cad 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -32,6 +32,7 @@ import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; @@ -43,6 +44,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; import org.apache.qpid.typedmessage.TypedBytesFormatException; +import org.apache.qpid.util.GZIPUtils; public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0> { @@ -202,7 +204,19 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement SectionEncoder sectionEncoder) { final String mimeType = serverMessage.getMessageHeader().getMimeType(); - Section bodySection = getBodySection(serverMessage, mimeType); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + byte[] uncompressed; + + if(Symbol.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING).equals(metaData.getPropertiesSection().getContentEncoding()) + && (uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)))!=null) + { + data = uncompressed; + metaData.getPropertiesSection().setContentEncoding(null); + } + + + Section bodySection = convertMessageBody(mimeType, data); final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder); @@ -279,14 +293,6 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement }; } - protected Section getBodySection(final M serverMessage, final String mimeType) - { - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getContent(ByteBuffer.wrap(data), 0); - - return convertMessageBody(mimeType, data); - } - private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder) { int headerSize = (int) metaData.getStorableSize(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index 4540308f61..fbc24ba454 100755 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -72,6 +72,17 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData this(sections, encodeSections(sections, encoder)); } + public Properties getPropertiesSection() + { + return _properties; + } + + + public Header getHeaderSection() + { + return _header; + } + private static ArrayList<ByteBuffer> encodeSections(final List<Section> sections, final SectionEncoder encoder) { ArrayList<ByteBuffer> encodedSections = new ArrayList<ByteBuffer>(sections.size()); |
