diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-16 15:21:41 +0000 |
| commit | c0e454cf882c7af8292832d6233940c56cc6a881 (patch) | |
| tree | 317fcecc377ef9899cbb3760dfbf600295d6beb7 /qpid/java/common/src | |
| parent | a22fa634fe3a3f51d1a27078e17cba82e48fcf46 (diff) | |
| download | qpid-python-c0e454cf882c7af8292832d6233940c56cc6a881.tar.gz | |
QPID-6000 : [Java Broker] [Java Client] add the ability to configure automatic message compression
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1618375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
8 files changed, 306 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index f10961c092..24ec496cc9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -254,6 +254,19 @@ public class ClientProperties public static final String CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = "qpid.connection_ssl_verify_hostname"; public static final boolean DEFAULT_CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = true; + /** + * System property to set a default value for a connection option 'compress_messages' + */ + public static final String CONNECTION_OPTION_COMPRESS_MESSAGES = "qpid.connection_compress_messages"; + public static final boolean DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES = false; + + + /** + * System property to set a default value for a connection option 'message_compression_threshold_size' + */ + public static final String CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE = "qpid.message_compression_threshold_size"; + public static final int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400; + private ClientProperties() { //No instances diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java index fe8c94cee1..b490aee898 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.framing; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class BasicContentHeaderProperties { //persistent & non-persistent constants, values as per JMS DeliveryMode @@ -83,8 +83,48 @@ public class BasicContentHeaderProperties private byte[] _encodedForm; + public BasicContentHeaderProperties(BasicContentHeaderProperties other) + { + if(other._headers != null) + { + byte[] encodedHeaders = other._headers.getDataAsBytes(); + + _headers = new FieldTable(encodedHeaders,0,encodedHeaders.length); + + } + + _contentType = other._contentType; + + _encoding = other._encoding; + + _deliveryMode = other._deliveryMode; + + _priority = other._priority; + + _correlationId = other._correlationId; + + _replyTo = other._replyTo; + + _expiration = other._expiration; + + _messageId = other._messageId; + + _timestamp = other._timestamp; + + _type = other._type; + + _userId = other._userId; + + _appId = other._appId; + + _clusterId = other._clusterId; + + _propertyFlags = other._propertyFlags; + } + public BasicContentHeaderProperties() - { } + { + } public int getPropertyListSize() { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index c4220894a8..9a455ce868 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -84,7 +84,7 @@ public class FieldTable _encodedSize = length; } - public FieldTable(byte[] encodedForm, int offset, int length) throws IOException + public FieldTable(byte[] encodedForm, int offset, int length) { this(); _encodedForm = encodedForm; @@ -858,7 +858,17 @@ public class FieldTable } } - return _encodedForm.clone(); + else if(_encodedFormOffset == 0 && _encodedSize == _encodedForm.length) + { + return _encodedForm.clone(); + } + else + { + byte[] encodedCopy = new byte[(int) _encodedSize]; + System.arraycopy(_encodedForm,_encodedFormOffset,encodedCopy,0,(int)_encodedSize); + return encodedCopy; + } + } public long getEncodedSize() diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java index b2bcc1836e..8f1a1d0be0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java @@ -41,6 +41,9 @@ public class ConnectionStartProperties */ public static final String QPID_CLOSE_WHEN_NO_ROUTE = "qpid.close_when_no_route"; + public static final String QPID_MESSAGE_COMPRESSION_SUPPORTED = "qpid.message_compression_supported"; + + public static final String CLIENT_ID_0_10 = "clientName"; public static final String CLIENT_ID_0_8 = "instance"; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 44cb30e735..99fc02c959 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -41,6 +41,7 @@ import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; @@ -78,6 +79,7 @@ public class Connection extends ConnectionInvoker private long _lastReadTime; private NetworkConnection _networkConnection; private FrameSizeObserver _frameSizeObserver; + private boolean _messageCompressionSupported; public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -699,6 +701,7 @@ public class Connection extends ConnectionInvoker public void setServerProperties(final Map<String, Object> serverProperties) { _serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties; + _messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED))); } public Map<String, Object> getServerProperties() @@ -848,4 +851,9 @@ public class Connection extends ConnectionInvoker }; } } + + public boolean isMessageCompressionSupported() + { + return _messageCompressionSupported; + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java index b72b342187..14b804f8c0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java @@ -92,4 +92,9 @@ public class ByteBufferInputStream extends InputStream { return _buffer.remaining(); } + + @Override + public void close() + { + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java new file mode 100644 index 0000000000..b5ba0b29af --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java @@ -0,0 +1,119 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GZIPUtils +{ + private static final Logger LOGGER = LoggerFactory.getLogger(GZIPUtils.class); + + public static final String GZIP_CONTENT_ENCODING = "gzip"; + + + /** + * Return a new byte array with the compressed contents of the input buffer + * + * @param input byte buffer to compress + * @return a byte array containing the compressed data, or null if the input was null or there was an unexpected + * IOException while compressing + */ + public static byte[] compressBufferToArray(ByteBuffer input) + { + if(input != null) + { + try (ByteArrayOutputStream compressedBuffer = new ByteArrayOutputStream()) + { + try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedBuffer)) + { + if (input.hasArray()) + { + gzipOutputStream.write(input.array(), + input.arrayOffset() + input.position(), + input.remaining()); + } + else + { + + byte[] data = new byte[input.remaining()]; + + input.duplicate().get(data); + + gzipOutputStream.write(data); + } + } + return compressedBuffer.toByteArray(); + } + catch (IOException e) + { + LOGGER.warn("Unexpected IOException when attempting to compress with gzip", e); + } + } + return null; + } + + public static byte[] uncompressBufferToArray(ByteBuffer contentBuffer) + { + if(contentBuffer != null) + { + try (ByteBufferInputStream input = new ByteBufferInputStream(contentBuffer)) + { + return uncompressStreamToArray(input); + } + } + else + { + return null; + } + } + + public static byte[] uncompressStreamToArray(InputStream stream) + { + if(stream != null) + { + try (GZIPInputStream gzipInputStream = new GZIPInputStream(stream)) + { + ByteArrayOutputStream inflatedContent = new ByteArrayOutputStream(); + int read; + byte[] buf = new byte[4096]; + while ((read = gzipInputStream.read(buf)) != -1) + { + inflatedContent.write(buf, 0, read); + } + return inflatedContent.toByteArray(); + } + catch (IOException e) + { + + LOGGER.warn("Unexpected IOException when attempting to uncompress with gzip", e); + } + } + return null; + } +} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java b/qpid/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java new file mode 100644 index 0000000000..60e80da15f --- /dev/null +++ b/qpid/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.util; + + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import junit.framework.TestCase; + +public class GZIPUtilsTest extends TestCase +{ + public void testCompressUncompress() throws Exception + { + byte[] data = new byte[1024]; + Arrays.fill(data, (byte)'a'); + byte[] compressed = GZIPUtils.compressBufferToArray(ByteBuffer.wrap(data)); + assertTrue("Compression didn't compress", compressed.length < data.length); + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(compressed)); + assertTrue("Compression not reversible", Arrays.equals(data,uncompressed)); + } + + public void testUncompressNonZipReturnsNull() throws Exception + { + byte[] data = new byte[1024]; + Arrays.fill(data, (byte)'a'); + assertNull("Non zipped data should not uncompress", GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data))); + } + + public void testUncompressStreamWithErrorReturnsNull() throws Exception + { + InputStream is = new InputStream() + { + @Override + public int read() throws IOException + { + throw new IOException(); + } + }; + assertNull("Stream error should return null", GZIPUtils.uncompressStreamToArray(is)); + } + + + public void testUncompressNullStreamReturnsNull() throws Exception + { + assertNull("Null Stream should return null", GZIPUtils.uncompressStreamToArray(null)); + } + public void testUncompressNullBufferReturnsNull() throws Exception + { + assertNull("Null buffer should return null", GZIPUtils.uncompressBufferToArray(null)); + } + + public void testCompressNullArrayReturnsNull() + { + assertNull(GZIPUtils.compressBufferToArray(null)); + } + + public void testNonHeapBuffers() throws Exception + { + + byte[] data = new byte[1024]; + Arrays.fill(data, (byte)'a'); + ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024); + directBuffer.put(data); + directBuffer.flip(); + + byte[] compressed = GZIPUtils.compressBufferToArray(directBuffer); + + assertTrue("Compression didn't compress", compressed.length < data.length); + + directBuffer.clear(); + directBuffer.position(1); + directBuffer = directBuffer.slice(); + directBuffer.put(compressed); + directBuffer.flip(); + + byte[] uncompressed = GZIPUtils.uncompressBufferToArray(directBuffer); + + assertTrue("Compression not reversible", Arrays.equals(data,uncompressed)); + + } +} |
