summaryrefslogtreecommitdiff
path: root/qpid/java/common/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-16 15:21:41 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-16 15:21:41 +0000
commitc0e454cf882c7af8292832d6233940c56cc6a881 (patch)
tree317fcecc377ef9899cbb3760dfbf600295d6beb7 /qpid/java/common/src
parenta22fa634fe3a3f51d1a27078e17cba82e48fcf46 (diff)
downloadqpid-python-c0e454cf882c7af8292832d6233940c56cc6a881.tar.gz
QPID-6000 : [Java Broker] [Java Client] add the ability to configure automatic message compression
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1618375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common/src')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java13
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java48
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java14
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java119
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java102
8 files changed, 306 insertions, 6 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index f10961c092..24ec496cc9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -254,6 +254,19 @@ public class ClientProperties
public static final String CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = "qpid.connection_ssl_verify_hostname";
public static final boolean DEFAULT_CONNECTION_OPTION_SSL_VERIFY_HOST_NAME = true;
+ /**
+ * System property to set a default value for a connection option 'compress_messages'
+ */
+ public static final String CONNECTION_OPTION_COMPRESS_MESSAGES = "qpid.connection_compress_messages";
+ public static final boolean DEFAULT_CONNECTION_OPTION_COMPRESS_MESSAGES = false;
+
+
+ /**
+ * System property to set a default value for a connection option 'message_compression_threshold_size'
+ */
+ public static final String CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE = "qpid.message_compression_threshold_size";
+ public static final int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+
private ClientProperties()
{
//No instances
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index fe8c94cee1..b490aee898 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.framing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class BasicContentHeaderProperties
{
//persistent & non-persistent constants, values as per JMS DeliveryMode
@@ -83,8 +83,48 @@ public class BasicContentHeaderProperties
private byte[] _encodedForm;
+ public BasicContentHeaderProperties(BasicContentHeaderProperties other)
+ {
+ if(other._headers != null)
+ {
+ byte[] encodedHeaders = other._headers.getDataAsBytes();
+
+ _headers = new FieldTable(encodedHeaders,0,encodedHeaders.length);
+
+ }
+
+ _contentType = other._contentType;
+
+ _encoding = other._encoding;
+
+ _deliveryMode = other._deliveryMode;
+
+ _priority = other._priority;
+
+ _correlationId = other._correlationId;
+
+ _replyTo = other._replyTo;
+
+ _expiration = other._expiration;
+
+ _messageId = other._messageId;
+
+ _timestamp = other._timestamp;
+
+ _type = other._type;
+
+ _userId = other._userId;
+
+ _appId = other._appId;
+
+ _clusterId = other._clusterId;
+
+ _propertyFlags = other._propertyFlags;
+ }
+
public BasicContentHeaderProperties()
- { }
+ {
+ }
public int getPropertyListSize()
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
index c4220894a8..9a455ce868 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -84,7 +84,7 @@ public class FieldTable
_encodedSize = length;
}
- public FieldTable(byte[] encodedForm, int offset, int length) throws IOException
+ public FieldTable(byte[] encodedForm, int offset, int length)
{
this();
_encodedForm = encodedForm;
@@ -858,7 +858,17 @@ public class FieldTable
}
}
- return _encodedForm.clone();
+ else if(_encodedFormOffset == 0 && _encodedSize == _encodedForm.length)
+ {
+ return _encodedForm.clone();
+ }
+ else
+ {
+ byte[] encodedCopy = new byte[(int) _encodedSize];
+ System.arraycopy(_encodedForm,_encodedFormOffset,encodedCopy,0,(int)_encodedSize);
+ return encodedCopy;
+ }
+
}
public long getEncodedSize()
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
index b2bcc1836e..8f1a1d0be0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/properties/ConnectionStartProperties.java
@@ -41,6 +41,9 @@ public class ConnectionStartProperties
*/
public static final String QPID_CLOSE_WHEN_NO_ROUTE = "qpid.close_when_no_route";
+ public static final String QPID_MESSAGE_COMPRESSION_SUPPORTED = "qpid.message_compression_supported";
+
+
public static final String CLIENT_ID_0_10 = "clientName";
public static final String CLIENT_ID_0_8 = "instance";
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 44cb30e735..99fc02c959 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -41,6 +41,7 @@ import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
@@ -78,6 +79,7 @@ public class Connection extends ConnectionInvoker
private long _lastReadTime;
private NetworkConnection _networkConnection;
private FrameSizeObserver _frameSizeObserver;
+ private boolean _messageCompressionSupported;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -699,6 +701,7 @@ public class Connection extends ConnectionInvoker
public void setServerProperties(final Map<String, Object> serverProperties)
{
_serverProperties = serverProperties == null ? Collections.<String, Object>emptyMap() : serverProperties;
+ _messageCompressionSupported = Boolean.parseBoolean(String.valueOf(_serverProperties.get(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED)));
}
public Map<String, Object> getServerProperties()
@@ -848,4 +851,9 @@ public class Connection extends ConnectionInvoker
};
}
}
+
+ public boolean isMessageCompressionSupported()
+ {
+ return _messageCompressionSupported;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java b/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
index b72b342187..14b804f8c0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/ByteBufferInputStream.java
@@ -92,4 +92,9 @@ public class ByteBufferInputStream extends InputStream
{
return _buffer.remaining();
}
+
+ @Override
+ public void close()
+ {
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java b/qpid/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java
new file mode 100644
index 0000000000..b5ba0b29af
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/GZIPUtils.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GZIPUtils
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(GZIPUtils.class);
+
+ public static final String GZIP_CONTENT_ENCODING = "gzip";
+
+
+ /**
+ * Return a new byte array with the compressed contents of the input buffer
+ *
+ * @param input byte buffer to compress
+ * @return a byte array containing the compressed data, or null if the input was null or there was an unexpected
+ * IOException while compressing
+ */
+ public static byte[] compressBufferToArray(ByteBuffer input)
+ {
+ if(input != null)
+ {
+ try (ByteArrayOutputStream compressedBuffer = new ByteArrayOutputStream())
+ {
+ try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedBuffer))
+ {
+ if (input.hasArray())
+ {
+ gzipOutputStream.write(input.array(),
+ input.arrayOffset() + input.position(),
+ input.remaining());
+ }
+ else
+ {
+
+ byte[] data = new byte[input.remaining()];
+
+ input.duplicate().get(data);
+
+ gzipOutputStream.write(data);
+ }
+ }
+ return compressedBuffer.toByteArray();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Unexpected IOException when attempting to compress with gzip", e);
+ }
+ }
+ return null;
+ }
+
+ public static byte[] uncompressBufferToArray(ByteBuffer contentBuffer)
+ {
+ if(contentBuffer != null)
+ {
+ try (ByteBufferInputStream input = new ByteBufferInputStream(contentBuffer))
+ {
+ return uncompressStreamToArray(input);
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public static byte[] uncompressStreamToArray(InputStream stream)
+ {
+ if(stream != null)
+ {
+ try (GZIPInputStream gzipInputStream = new GZIPInputStream(stream))
+ {
+ ByteArrayOutputStream inflatedContent = new ByteArrayOutputStream();
+ int read;
+ byte[] buf = new byte[4096];
+ while ((read = gzipInputStream.read(buf)) != -1)
+ {
+ inflatedContent.write(buf, 0, read);
+ }
+ return inflatedContent.toByteArray();
+ }
+ catch (IOException e)
+ {
+
+ LOGGER.warn("Unexpected IOException when attempting to uncompress with gzip", e);
+ }
+ }
+ return null;
+ }
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java b/qpid/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java
new file mode 100644
index 0000000000..60e80da15f
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/util/GZIPUtilsTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.util;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class GZIPUtilsTest extends TestCase
+{
+ public void testCompressUncompress() throws Exception
+ {
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte)'a');
+ byte[] compressed = GZIPUtils.compressBufferToArray(ByteBuffer.wrap(data));
+ assertTrue("Compression didn't compress", compressed.length < data.length);
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(compressed));
+ assertTrue("Compression not reversible", Arrays.equals(data,uncompressed));
+ }
+
+ public void testUncompressNonZipReturnsNull() throws Exception
+ {
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte)'a');
+ assertNull("Non zipped data should not uncompress", GZIPUtils.uncompressBufferToArray(ByteBuffer.wrap(data)));
+ }
+
+ public void testUncompressStreamWithErrorReturnsNull() throws Exception
+ {
+ InputStream is = new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ throw new IOException();
+ }
+ };
+ assertNull("Stream error should return null", GZIPUtils.uncompressStreamToArray(is));
+ }
+
+
+ public void testUncompressNullStreamReturnsNull() throws Exception
+ {
+ assertNull("Null Stream should return null", GZIPUtils.uncompressStreamToArray(null));
+ }
+ public void testUncompressNullBufferReturnsNull() throws Exception
+ {
+ assertNull("Null buffer should return null", GZIPUtils.uncompressBufferToArray(null));
+ }
+
+ public void testCompressNullArrayReturnsNull()
+ {
+ assertNull(GZIPUtils.compressBufferToArray(null));
+ }
+
+ public void testNonHeapBuffers() throws Exception
+ {
+
+ byte[] data = new byte[1024];
+ Arrays.fill(data, (byte)'a');
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024);
+ directBuffer.put(data);
+ directBuffer.flip();
+
+ byte[] compressed = GZIPUtils.compressBufferToArray(directBuffer);
+
+ assertTrue("Compression didn't compress", compressed.length < data.length);
+
+ directBuffer.clear();
+ directBuffer.position(1);
+ directBuffer = directBuffer.slice();
+ directBuffer.put(compressed);
+ directBuffer.flip();
+
+ byte[] uncompressed = GZIPUtils.uncompressBufferToArray(directBuffer);
+
+ assertTrue("Compression not reversible", Arrays.equals(data,uncompressed));
+
+ }
+}