From 5b060554268c763cb883a102b04be21741551161 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Mon, 28 Jan 2008 16:48:00 +0000 Subject: Merged revisions 608477,609961,610475,610479,610806,611146 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r608477 | rgodfrey | 2008-01-03 13:23:04 +0000 (Thu, 03 Jan 2008) | 1 line QPID-499 : Added per-virtual host timed tasks to inspect queues (with no consumers) for expired messages ........ r609961 | ritchiem | 2008-01-08 12:59:01 +0000 (Tue, 08 Jan 2008) | 2 lines QPID-499 : Patch to update the queue size statistics when the Active TTL process runs Removed old single commented out code line from AMQSession. ........ r610475 | ritchiem | 2008-01-09 17:32:43 +0000 (Wed, 09 Jan 2008) | 1 line Qpid-723 Added exec to qpid.start ........ r610479 | ritchiem | 2008-01-09 17:39:54 +0000 (Wed, 09 Jan 2008) | 1 line Qpid-690 : Provide configurable delay between re-connecion attempts. ........ r610806 | ritchiem | 2008-01-10 14:41:37 +0000 (Thu, 10 Jan 2008) | 1 line QPID-690 : Relaxed the timings on failover as Thread.sleep is accurate to 10ms so may finish the sleep 10ms early. Resulting in erratic failures as 9.9s < 10s. ........ r611146 | ritchiem | 2008-01-11 11:33:31 +0000 (Fri, 11 Jan 2008) | 1 line Patch by Aidan Skinner to make third constructor public. This is done so that the BDBMessageStore tests can still run with the addition of the VirtualHost reaper thread. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@615943 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java index 461cf9591d..633cf4fe3a 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -215,6 +215,14 @@ public class ConcurrentLinkedMessageQueueAtomicSize extends ConcurrentLinkedQ public void remove() { last.remove(); + if(last == _mainIterator) + { + _size.decrementAndGet(); + } + else + { + _messageHeadSize.decrementAndGet(); + } } }; } -- cgit v1.2.1 From cebb9bb85aa61402762e2567be0d6a714b65f8d2 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 8 Feb 2008 12:52:54 +0000 Subject: QPID-588: change instances of trace() and isTraceEnabled to debug equivalent to support older versions of log4j git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@619868 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/framing/FieldTable.java | 26 +++++++++++----------- 1 file changed, 13 insertions(+), 13 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 46b10b5963..55968ffe19 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -706,14 +706,14 @@ public class FieldTable public void writeToBuffer(ByteBuffer buffer) { - final boolean trace = _logger.isTraceEnabled(); + final boolean trace = _logger.isDebugEnabled(); if (trace) { - _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); + _logger.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); if (_properties != null) { - _logger.trace(_properties.toString()); + _logger.debug(_properties.toString()); } } @@ -918,11 +918,11 @@ public class FieldTable final Map.Entry me = it.next(); try { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); } // Write the actual parameter name @@ -931,12 +931,12 @@ public class FieldTable } catch (Exception e) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Exception thrown:" + e); - _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + _logger.debug("Exception thrown:" + e); + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); } throw new RuntimeException(e); @@ -948,7 +948,7 @@ public class FieldTable private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException { - final boolean trace = _logger.isTraceEnabled(); + final boolean trace = _logger.isDebugEnabled(); if (length > 0) { @@ -964,7 +964,7 @@ public class FieldTable if (trace) { - _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "'"); } @@ -979,7 +979,7 @@ public class FieldTable if (trace) { - _logger.trace("FieldTable::FieldTable(buffer," + length + "): Done."); + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); } } -- cgit v1.2.1 From 62ef6db190b842c39a7101a0d108c28554171b1b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 13 Feb 2008 14:01:26 +0000 Subject: QPID-789 : FieldTable putDataInBuffer method not thread safe git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@627416 13f79535-47bb-0310-9956-ffa450edef68 --- java/common/src/main/java/org/apache/qpid/framing/FieldTable.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 55968ffe19..7f851ab264 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -898,13 +898,15 @@ public class FieldTable if (_encodedForm != null) { - if (_encodedForm.position() != 0) + ByteBuffer encodedForm = _encodedForm.duplicate(); + + if (encodedForm.position() != 0) { - _encodedForm.flip(); + encodedForm.flip(); } // _encodedForm.limit((int)getEncodedSize()); - buffer.put(_encodedForm); + buffer.put(encodedForm); } else if (_properties != null) { -- cgit v1.2.1 From 91dfa2865cb9998a379e099ff58e830b4b1ba8a4 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 13 Feb 2008 18:10:53 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@627552 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/framing/CompositeAMQDataBlock.java | 21 +++++---- .../org/apache/qpid/framing/DeferredDataBlock.java | 50 ++++++++++++++++++++++ .../qpid/framing/SmallCompositeAMQDataBlock.java | 22 +++++----- .../framing/abstraction/MessagePublishInfo.java | 2 + .../qpid/framing/amqp_0_9/MethodConverter_0_9.java | 9 +++- .../qpid/framing/amqp_8_0/MethodConverter_8_0.java | 7 ++- 6 files changed, 86 insertions(+), 25 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 5ec62ede93..7b6699b783 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -24,7 +24,7 @@ import org.apache.mina.common.ByteBuffer; public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { - private ByteBuffer _encodedBlock; + private AMQDataBlock _firstFrame; private AMQDataBlock[] _blocks; @@ -39,10 +39,10 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD * @param encodedBlock already-encoded data * @param blocks some blocks to be encoded. */ - public CompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock[] blocks) + public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks) { this(blocks); - _encodedBlock = encodedBlock; + _firstFrame = encodedBlock; } public AMQDataBlock[] getBlocks() @@ -50,9 +50,9 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD return _blocks; } - public ByteBuffer getEncodedBlock() + public AMQDataBlock getFirstFrame() { - return _encodedBlock; + return _firstFrame; } public long getSize() @@ -62,19 +62,18 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD { frameSize += _blocks[i].getSize(); } - if (_encodedBlock != null) + if (_firstFrame != null) { - _encodedBlock.rewind(); - frameSize += _encodedBlock.remaining(); + frameSize += _firstFrame.getSize(); } return frameSize; } public void writePayload(ByteBuffer buffer) { - if (_encodedBlock != null) + if (_firstFrame != null) { - buffer.put(_encodedBlock); + _firstFrame.writePayload(buffer); } for (int i = 0; i < _blocks.length; i++) { @@ -91,7 +90,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD else { StringBuilder buf = new StringBuilder(this.getClass().getName()); - buf.append("{encodedBlock=").append(_encodedBlock); + buf.append("{encodedBlock=").append(_firstFrame); for (int i = 0 ; i < _blocks.length; i++) { buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); diff --git a/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java new file mode 100644 index 0000000000..f6795ff200 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/DeferredDataBlock.java @@ -0,0 +1,50 @@ +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public abstract class DeferredDataBlock extends AMQDataBlock +{ + private AMQDataBlock _underlyingDataBlock; + + + public long getSize() + { + if(_underlyingDataBlock == null) + { + _underlyingDataBlock = createAMQDataBlock(); + } + return _underlyingDataBlock.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + if(_underlyingDataBlock == null) + { + _underlyingDataBlock = createAMQDataBlock(); + } + _underlyingDataBlock.writePayload(buffer); + } + + abstract protected AMQDataBlock createAMQDataBlock(); + +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java index 26c048e34a..f8cf3f3011 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java @@ -25,7 +25,7 @@ import org.apache.mina.common.ByteBuffer; public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { - private ByteBuffer _encodedBlock; + private AMQDataBlock _firstFrame; private AMQDataBlock _block; @@ -40,10 +40,10 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl * @param encodedBlock already-encoded data * @param block a block to be encoded. */ - public SmallCompositeAMQDataBlock(ByteBuffer encodedBlock, AMQDataBlock block) + public SmallCompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock block) { this(block); - _encodedBlock = encodedBlock; + _firstFrame = encodedBlock; } public AMQDataBlock getBlock() @@ -51,28 +51,28 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl return _block; } - public ByteBuffer getEncodedBlock() + public AMQDataBlock getFirstFrame() { - return _encodedBlock; + return _firstFrame; } public long getSize() { long frameSize = _block.getSize(); - if (_encodedBlock != null) + if (_firstFrame != null) { - _encodedBlock.rewind(); - frameSize += _encodedBlock.remaining(); + + frameSize += _firstFrame.getSize(); } return frameSize; } public void writePayload(ByteBuffer buffer) { - if (_encodedBlock != null) + if (_firstFrame != null) { - buffer.put(_encodedBlock); + _firstFrame.writePayload(buffer); } _block.writePayload(buffer); @@ -87,7 +87,7 @@ public class SmallCompositeAMQDataBlock extends AMQDataBlock implements Encodabl else { StringBuilder buf = new StringBuilder(this.getClass().getName()); - buf.append("{encodedBlock=").append(_encodedBlock); + buf.append("{encodedBlock=").append(_firstFrame); buf.append(" _block=[").append(_block.toString()).append("]"); diff --git a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java index 706499c1b0..49c28bb06b 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java +++ b/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java @@ -27,6 +27,8 @@ public interface MessagePublishInfo public AMQShortString getExchange(); + public void setExchange(AMQShortString exchange); + public boolean isImmediate(); public boolean isMandatory(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index de0007c132..d8b6b25b92 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -67,7 +67,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot final AMQShortString exchange = publishBody.getExchange(); final AMQShortString routingKey = publishBody.getRoutingKey(); - return new MethodConverter_0_9.MessagePublishInfoImpl(exchange == null ? null : exchange.intern(), + return new MethodConverter_0_9.MessagePublishInfoImpl(exchange, publishBody.getImmediate(), publishBody.getMandatory(), routingKey == null ? null : routingKey.intern()); @@ -87,7 +87,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot private static class MessagePublishInfoImpl implements MessagePublishInfo { - private final AMQShortString _exchange; + private AMQShortString _exchange; private final boolean _immediate; private final boolean _mandatory; private final AMQShortString _routingKey; @@ -108,6 +108,11 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot return _exchange; } + public void setExchange(AMQShortString exchange) + { + _exchange = exchange; + } + public boolean isImmediate() { return _immediate; diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java index 7a13af8a43..b1be49a350 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java @@ -107,7 +107,7 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot private static class MessagePublishInfoImpl implements MessagePublishInfo { - private final AMQShortString _exchange; + private AMQShortString _exchange; private final boolean _immediate; private final boolean _mandatory; private final AMQShortString _routingKey; @@ -128,6 +128,11 @@ public class MethodConverter_8_0 extends AbstractMethodConverter implements Prot return _exchange; } + public void setExchange(AMQShortString exchange) + { + _exchange = exchange; + } + public boolean isImmediate() { return _immediate; -- cgit v1.2.1 From e1f14213a9b4d1159aee92521891fda274912fdb Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Thu, 14 Feb 2008 16:01:15 +0000 Subject: QPID-9 : Nested field tables implemented. Also wrote a test that encodes/decodes one to check it works. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@627789 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/framing/AMQType.java | 262 ++++++++++++++------- .../org/apache/qpid/framing/AMQTypedValue.java | 32 ++- .../java/org/apache/qpid/framing/FieldTable.java | 62 ++++- .../qpid/framing/PropertyFieldTableTest.java | 82 +++++-- 4 files changed, 331 insertions(+), 107 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java index 6dda91a488..2c356d072c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQType.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQType.java @@ -23,12 +23,24 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; import java.math.BigDecimal; -import java.math.BigInteger; +/** + * AMQType is a type that represents the different possible AMQP field table types. It provides operations for each + * of the types to perform tasks such as calculating the size of an instance of the type, converting types between AMQP + * and Java native types, and reading and writing instances of AMQP types in binary formats to and from byte buffers. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Get the equivalent one byte identifier for a type. + *
Calculate the size of an instance of an AMQP parameter type. {@link EncodingUtils} + *
Convert an instance of an AMQP parameter into a compatable Java object tagged with its AMQP type. + * {@link AMQTypedValue} + *
Write an instance of an AMQP parameter type to a byte buffer. {@link EncodingUtils} + *
Read an instance of an AMQP parameter from a byte buffer. {@link EncodingUtils} + *
+ */ public enum AMQType { - //AMQP FieldTable Wire Types - LONG_STRING('S') { public int getEncodingSize(Object value) @@ -36,7 +48,6 @@ public enum AMQType return EncodingUtils.encodedLongStringLength((String) value); } - public String toNativeValue(Object value) { if (value != null) @@ -58,12 +69,10 @@ public enum AMQType { return EncodingUtils.readLongString(buffer); } - }, INTEGER('i') { - public int getEncodingSize(Object value) { return EncodingUtils.unsignedIntegerLength(); @@ -89,12 +98,11 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Long.valueOf((String)value); + return Long.valueOf((String) value); } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to int."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int."); } } @@ -111,22 +119,21 @@ public enum AMQType DECIMAL('D') { - public int getEncodingSize(Object value) { - return EncodingUtils.encodedByteLength()+ EncodingUtils.encodedIntegerLength(); + return EncodingUtils.encodedByteLength() + EncodingUtils.encodedIntegerLength(); } public Object toNativeValue(Object value) { - if(value instanceof BigDecimal) + if (value instanceof BigDecimal) { return (BigDecimal) value; } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to BigDecimal."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to BigDecimal."); } } @@ -150,7 +157,8 @@ public enum AMQType int unscaled = EncodingUtils.readInteger(buffer); BigDecimal bd = new BigDecimal(unscaled); - return bd.setScale(places); + + return bd.setScale(places); } }, @@ -163,14 +171,14 @@ public enum AMQType public Object toNativeValue(Object value) { - if(value instanceof Long) + if (value instanceof Long) { return (Long) value; } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to timestamp."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to timestamp."); } } @@ -179,37 +187,97 @@ public enum AMQType EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) { return EncodingUtils.readLong(buffer); } }, + /** + * Implements the field table type. The native value of a field table type will be an instance of + * {@link FieldTable}, which itself may contain name/value pairs encoded as {@link AMQTypedValue}s. + */ FIELD_TABLE('F') { + /** + * Calculates the size of an instance of the type in bytes. + * + * @param value An instance of the type. + * + * @return The size of the instance of the type in bytes. + */ public int getEncodingSize(Object value) { - // TODO : fixme - throw new UnsupportedOperationException(); + // Ensure that the value is a FieldTable. + if (!(value instanceof FieldTable)) + { + throw new IllegalArgumentException("Value is not a FieldTable."); + } + + FieldTable ftValue = (FieldTable) value; + + // Loop over all name/value pairs adding up size of each. FieldTable itself keeps track of its encoded + // size as entries are added, so no need to loop over all explicitly. + // EncodingUtils calculation of the encoded field table lenth, will include 4 bytes for its 'size' field. + return EncodingUtils.encodedFieldTableLength(ftValue); } + /** + * Converts an instance of the type to an equivalent Java native representation. + * + * @param value An instance of the type. + * + * @return An equivalent Java native representation. + */ public Object toNativeValue(Object value) { - // TODO : fixme - throw new UnsupportedOperationException(); + // Ensure that the value is a FieldTable. + if (!(value instanceof FieldTable)) + { + throw new IllegalArgumentException("Value is not a FieldTable."); + } + + return (FieldTable) value; } + /** + * Writes an instance of the type to a specified byte buffer. + * + * @param value An instance of the type. + * @param buffer The byte buffer to write it to. + */ public void writeValueImpl(Object value, ByteBuffer buffer) { - // TODO : fixme - throw new UnsupportedOperationException(); + // Ensure that the value is a FieldTable. + if (!(value instanceof FieldTable)) + { + throw new IllegalArgumentException("Value is not a FieldTable."); + } + + FieldTable ftValue = (FieldTable) value; + + // Loop over all name/values writing out into buffer. + ftValue.writeToBuffer(buffer); } + /** + * Reads an instance of the type from a specified byte buffer. + * + * @param buffer The byte buffer to write it to. + * + * @return An instance of the type. + */ public Object readValueFromBuffer(ByteBuffer buffer) { - // TODO : fixme - throw new UnsupportedOperationException(); + try + { + // Read size of field table then all name/value pairs. + return EncodingUtils.readFieldTable(buffer); + } + catch (AMQFrameDecodingException e) + { + throw new IllegalArgumentException("Unable to read field table from buffer.", e); + } } }, @@ -220,7 +288,6 @@ public enum AMQType return 0; } - public Object toNativeValue(Object value) { if (value == null) @@ -229,14 +296,13 @@ public enum AMQType } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to null String."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to null String."); } } public void writeValueImpl(Object value, ByteBuffer buffer) - { - } + { } public Object readValueFromBuffer(ByteBuffer buffer) { @@ -244,8 +310,6 @@ public enum AMQType } }, - // Extended types - BINARY('x') { public int getEncodingSize(Object value) @@ -253,21 +317,19 @@ public enum AMQType return EncodingUtils.encodedLongstrLength((byte[]) value); } - public Object toNativeValue(Object value) { - if((value instanceof byte[]) || (value == null)) + if ((value instanceof byte[]) || (value == null)) { return value; } else { - throw new IllegalArgumentException("Value: " + value + " (" + value.getClass().getName() + - ") cannot be converted to byte[]"); + throw new IllegalArgumentException("Value: " + value + " (" + value.getClass().getName() + + ") cannot be converted to byte[]"); } } - public void writeValueImpl(Object value, ByteBuffer buffer) { EncodingUtils.writeLongstr(buffer, (byte[]) value); @@ -277,7 +339,6 @@ public enum AMQType { return EncodingUtils.readLongstr(buffer); } - }, ASCII_STRING('c') @@ -287,7 +348,6 @@ public enum AMQType return EncodingUtils.encodedLongStringLength((String) value); } - public String toNativeValue(Object value) { if (value != null) @@ -309,7 +369,6 @@ public enum AMQType { return EncodingUtils.readLongString(buffer); } - }, WIDE_STRING('C') @@ -320,7 +379,6 @@ public enum AMQType return EncodingUtils.encodedLongStringLength((String) value); } - public String toNativeValue(Object value) { if (value != null) @@ -351,7 +409,6 @@ public enum AMQType return EncodingUtils.encodedBooleanLength(); } - public Object toNativeValue(Object value) { if (value instanceof Boolean) @@ -360,12 +417,12 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Boolean.valueOf((String)value); + return Boolean.valueOf((String) value); } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to boolean."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to boolean."); } } @@ -374,7 +431,6 @@ public enum AMQType EncodingUtils.writeBoolean(buffer, (Boolean) value); } - public Object readValueFromBuffer(ByteBuffer buffer) { return EncodingUtils.readBoolean(buffer); @@ -388,7 +444,6 @@ public enum AMQType return EncodingUtils.encodedCharLength(); } - public Character toNativeValue(Object value) { if (value instanceof Character) @@ -401,8 +456,8 @@ public enum AMQType } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to char."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to char."); } } @@ -415,7 +470,6 @@ public enum AMQType { return EncodingUtils.readChar(buffer); } - }, BYTE('b') @@ -425,7 +479,6 @@ public enum AMQType return EncodingUtils.encodedByteLength(); } - public Byte toNativeValue(Object value) { if (value instanceof Byte) @@ -434,12 +487,12 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Byte.valueOf((String)value); + return Byte.valueOf((String) value); } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to byte."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to byte."); } } @@ -456,13 +509,11 @@ public enum AMQType SHORT('s') { - public int getEncodingSize(Object value) { return EncodingUtils.encodedShortLength(); } - public Short toNativeValue(Object value) { if (value instanceof Short) @@ -475,16 +526,13 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Short.valueOf((String)value); + return Short.valueOf((String) value); } - else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to short."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to short."); } - - } public void writeValueImpl(Object value, ByteBuffer buffer) @@ -521,12 +569,11 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Integer.valueOf((String)value); + return Integer.valueOf((String) value); } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to int."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + ") to int."); } } @@ -543,7 +590,6 @@ public enum AMQType LONG('l') { - public int getEncodingSize(Object value) { return EncodingUtils.encodedLongLength(); @@ -551,7 +597,7 @@ public enum AMQType public Object toNativeValue(Object value) { - if(value instanceof Long) + if (value instanceof Long) { return (Long) value; } @@ -569,12 +615,12 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Long.valueOf((String)value); + return Long.valueOf((String) value); } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to long."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to long."); } } @@ -583,7 +629,6 @@ public enum AMQType EncodingUtils.writeLong(buffer, (Long) value); } - public Object readValueFromBuffer(ByteBuffer buffer) { return EncodingUtils.readLong(buffer); @@ -597,7 +642,6 @@ public enum AMQType return EncodingUtils.encodedFloatLength(); } - public Float toNativeValue(Object value) { if (value instanceof Float) @@ -606,12 +650,12 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Float.valueOf((String)value); + return Float.valueOf((String) value); } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to float."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to float."); } } @@ -628,13 +672,11 @@ public enum AMQType DOUBLE('d') { - public int getEncodingSize(Object value) { return EncodingUtils.encodedDoubleLength(); } - public Double toNativeValue(Object value) { if (value instanceof Double) @@ -647,12 +689,12 @@ public enum AMQType } else if ((value instanceof String) || (value == null)) { - return Double.valueOf((String)value); + return Double.valueOf((String) value); } else { - throw new NumberFormatException("Cannot convert: " + value + "(" + - value.getClass().getName() + ") to double."); + throw new NumberFormatException("Cannot convert: " + value + "(" + value.getClass().getName() + + ") to double."); } } @@ -667,35 +709,87 @@ public enum AMQType } }; + /** Holds the defined one byte identifier for the type. */ private final byte _identifier; + /** + * Creates an instance of an AMQP type from its defined one byte identifier. + * + * @param identifier The one byte identifier for the type. + */ AMQType(char identifier) { _identifier = (byte) identifier; } + /** + * Extracts the byte identifier for the typ. + * + * @return The byte identifier for the typ. + */ public final byte identifier() { return _identifier; } - + /** + * Calculates the size of an instance of the type in bytes. + * + * @param value An instance of the type. + * + * @return The size of the instance of the type in bytes. + */ public abstract int getEncodingSize(Object value); + /** + * Converts an instance of the type to an equivalent Java native representation. + * + * @param value An instance of the type. + * + * @return An equivalent Java native representation. + */ public abstract Object toNativeValue(Object value); + /** + * Converts an instance of the type to an equivalent Java native representation, packaged as an + * {@link AMQTypedValue} tagged with its AMQP type. + * + * @param value An instance of the type. + * + * @return An equivalent Java native representation, tagged with its AMQP type. + */ public AMQTypedValue asTypedValue(Object value) { return new AMQTypedValue(this, toNativeValue(value)); } + /** + * Writes an instance of the type to a specified byte buffer, preceded by its one byte identifier. As the type and + * value are both written, this provides a fully encoded description of a parameters type and value. + * + * @param value An instance of the type. + * @param buffer The byte buffer to write it to. + */ public void writeToBuffer(Object value, ByteBuffer buffer) { - buffer.put((byte)identifier()); + buffer.put(identifier()); writeValueImpl(value, buffer); } + /** + * Writes an instance of the type to a specified byte buffer. + * + * @param value An instance of the type. + * @param buffer The byte buffer to write it to. + */ abstract void writeValueImpl(Object value, ByteBuffer buffer); + /** + * Reads an instance of the type from a specified byte buffer. + * + * @param buffer The byte buffer to write it to. + * + * @return An instance of the type. + */ abstract Object readValueFromBuffer(ByteBuffer buffer); } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java index 7193580884..e5b1fad9a8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java @@ -18,23 +18,40 @@ * under the License. * */ - package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +/** + * AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter + * value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides + * the ability to create such parameters from Java native value and a type tag or to extract the native value and type + * from one. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Create a fully typed AMQP value from a native type and a type tag. {@link AMQType} + *
Create a fully typed AMQP value from a binary representation in a byte buffer. {@link AMQType} + *
Write a fully typed AMQP value to a binary representation in a byte buffer. {@link AMQType} + *
Extract the type from a fully typed AMQP value. + *
Extract the value from a fully typed AMQP value. + *
+ */ public class AMQTypedValue { + /** The type of the value. */ private final AMQType _type; - private final Object _value; + /** The Java native representation of the AMQP typed value. */ + private final Object _value; public AMQTypedValue(AMQType type, Object value) { - if(type == null) + if (type == null) { throw new NullPointerException("Cannot create a typed value with null type"); } + _type = type; _value = type.toNativeValue(value); } @@ -42,10 +59,9 @@ public class AMQTypedValue private AMQTypedValue(AMQType type, ByteBuffer buffer) { _type = type; - _value = type.readValueFromBuffer( buffer ); + _value = type.readValueFromBuffer(buffer); } - public AMQType getType() { return _type; @@ -56,10 +72,9 @@ public class AMQTypedValue return _value; } - public void writeToBuffer(ByteBuffer buffer) { - _type.writeToBuffer(_value,buffer); + _type.writeToBuffer(_value, buffer); } public int getEncodingSize() @@ -70,11 +85,12 @@ public class AMQTypedValue public static AMQTypedValue readFromBuffer(ByteBuffer buffer) { AMQType type = AMQTypeMap.getType(buffer.get()); + return new AMQTypedValue(type, buffer); } public String toString() { - return "["+getType()+": "+getValue()+"]"; + return "[" + getType() + ": " + getValue() + "]"; } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java index 7f851ab264..ee6762181d 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java +++ b/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java @@ -18,7 +18,6 @@ * under the License. * */ - package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; @@ -360,6 +359,41 @@ public class FieldTable } } + /** + * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name. + * + * @param string The name of the parameter to get the associated FieldTable value for. + * + * @return The associated FieldTable value, or null if the associated value is not of FieldTable type or + * not present in the field table at all. + */ + public FieldTable getFieldTable(String string) + { + return getFieldTable(new AMQShortString(string)); + } + + /** + * Extracts a value from the field table that is itself a FieldTable associated with the specified parameter name. + * + * @param string The name of the parameter to get the associated FieldTable value for. + * + * @return The associated FieldTable value, or null if the associated value is not of FieldTable type or + * not present in the field table at all. + */ + public FieldTable getFieldTable(AMQShortString string) + { + AMQTypedValue value = getProperty(string); + + if ((value != null) && (value.getType() == AMQType.FIELD_TABLE)) + { + return (FieldTable) value.getValue(); + } + else + { + return null; + } + } + public Object getObject(String string) { return getObject(new AMQShortString(string)); @@ -568,6 +602,32 @@ public class FieldTable return setProperty(string, AMQType.VOID.asTypedValue(null)); } + /** + * Associates a nested field table with the specified parameter name. + * + * @param string The name of the parameter to store in the table. + * @param ftValue The field table value to associate with the parameter name. + * + * @return The stored value. + */ + public Object setFieldTable(String string, FieldTable ftValue) + { + return setFieldTable(new AMQShortString(string), ftValue); + } + + /** + * Associates a nested field table with the specified parameter name. + * + * @param string The name of the parameter to store in the table. + * @param ftValue The field table value to associate with the parameter name. + * + * @return The stored value. + */ + public Object setFieldTable(AMQShortString string, FieldTable ftValue) + { + return setProperty(string, AMQType.FIELD_TABLE.asTypedValue(ftValue)); + } + public Object setObject(AMQShortString string, Object object) { if (object instanceof Boolean) diff --git a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java index e63b0df770..007da7423e 100644 --- a/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java +++ b/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java @@ -1,21 +1,21 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.framing; @@ -439,6 +439,60 @@ public class PropertyFieldTableTest extends TestCase Assert.assertEquals("Hello", table1.getString("value")); } + /** Check that a nested field table parameter correctly encodes and decodes to a byte buffer. */ + public void testNestedFieldTable() + { + byte[] testBytes = new byte[] { 0, 1, 2, 3, 4, 5 }; + + FieldTable outerTable = new FieldTable(); + FieldTable innerTable = new FieldTable(); + + // Put some stuff in the inner table. + innerTable.setBoolean("bool", true); + innerTable.setByte("byte", Byte.MAX_VALUE); + innerTable.setBytes("bytes", testBytes); + innerTable.setChar("char", 'c'); + innerTable.setDouble("double", Double.MAX_VALUE); + innerTable.setFloat("float", Float.MAX_VALUE); + innerTable.setInteger("int", Integer.MAX_VALUE); + innerTable.setLong("long", Long.MAX_VALUE); + innerTable.setShort("short", Short.MAX_VALUE); + innerTable.setString("string", "hello"); + innerTable.setString("null-string", null); + + // Put the inner table in the outer one. + outerTable.setFieldTable("innerTable", innerTable); + + // Write the outer table into the buffer. + final ByteBuffer buffer = ByteBuffer.allocate((int) outerTable.getEncodedSize() + 4); + outerTable.writeToBuffer(buffer); + buffer.flip(); + + // Extract the table back from the buffer again. + try + { + FieldTable extractedOuterTable = EncodingUtils.readFieldTable(buffer); + + FieldTable extractedTable = extractedOuterTable.getFieldTable("innerTable"); + + Assert.assertEquals((Boolean) true, extractedTable.getBoolean("bool")); + Assert.assertEquals((Byte) Byte.MAX_VALUE, extractedTable.getByte("byte")); + assertBytesEqual(testBytes, extractedTable.getBytes("bytes")); + Assert.assertEquals((Character) 'c', extractedTable.getCharacter("char")); + Assert.assertEquals(Double.MAX_VALUE, extractedTable.getDouble("double")); + Assert.assertEquals(Float.MAX_VALUE, extractedTable.getFloat("float")); + Assert.assertEquals((Integer) Integer.MAX_VALUE, extractedTable.getInteger("int")); + Assert.assertEquals((Long) Long.MAX_VALUE, extractedTable.getLong("long")); + Assert.assertEquals((Short) Short.MAX_VALUE, extractedTable.getShort("short")); + Assert.assertEquals("hello", extractedTable.getString("string")); + Assert.assertEquals(null, extractedTable.getString("null-string")); + } + catch (AMQFrameDecodingException e) + { + fail("Failed to decode field table with nested inner table."); + } + } + public void testValues() { FieldTable table = new FieldTable(); -- cgit v1.2.1 From 45072deb936db16404f3746eeb9dcb74e4ecd3df Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Tue, 19 Feb 2008 16:53:57 +0000 Subject: Qpid-594: make AMQConnection listen for exceptions that are thrown asynchronously in it's constructor and do something appropriate with them git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629158 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/AMQConnectionFailureException.java | 87 ++++++++++++---------- 1 file changed, 47 insertions(+), 40 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java index f78307d16f..eddd225d28 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java @@ -1,40 +1,47 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid; - -/** - * AMQConnectionFailureException indicates that a connection to a broker could not be formed. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Represents failure to connect to a broker. - *
- * - * @todo Not an AMQP exception as no status code. - */ -public class AMQConnectionFailureException extends AMQException -{ - public AMQConnectionFailureException(String message) - { - super(message); - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid; + +import org.apache.qpid.protocol.AMQConstant; + +/** + * AMQConnectionFailureException indicates that a connection to a broker could not be formed. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Represents failure to connect to a broker. + *
+ * + * @todo Not an AMQP exception as no status code. + */ +public class AMQConnectionFailureException extends AMQException +{ + public AMQConnectionFailureException(String message) + { + super(message); + } + + public AMQConnectionFailureException(AMQConstant errorCode, String message, Throwable cause) + { + super(errorCode, message, cause); + } +} -- cgit v1.2.1 From 3047c0ec2d581f4b51c77fec84fbf0bec8599573 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 21 Feb 2008 10:09:03 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629731 13f79535-47bb-0310-9956-ffa450edef68 --- .../mina/common/FixedSizeByteBufferAllocator.java | 528 +++++++++++++++++++++ .../mina/common/support/DefaultIoFuture.java | 250 ++++++++++ .../mina/filter/codec/QpidProtocolCodecFilter.java | 440 +++++++++++++++++ .../java/org/apache/qpid/codec/AMQDecoder.java | 97 ++++ .../main/java/org/apache/qpid/framing/AMQBody.java | 4 + .../java/org/apache/qpid/framing/AMQFrame.java | 58 ++- .../org/apache/qpid/framing/AMQMethodBodyImpl.java | 7 + .../org/apache/qpid/framing/AMQShortString.java | 356 ++++++++++---- .../qpid/framing/AMQShortStringTokenizer.java | 31 ++ .../apache/qpid/framing/CompositeAMQDataBlock.java | 26 +- .../java/org/apache/qpid/framing/ContentBody.java | 8 + .../org/apache/qpid/framing/ContentHeaderBody.java | 8 + .../org/apache/qpid/framing/HeartbeatBody.java | 8 + .../qpid/framing/amqp_0_9/MethodConverter_0_9.java | 2 +- .../src/main/java/org/apache/qpid/pool/Job.java | 22 +- .../java/org/apache/qpid/pool/PoolingFilter.java | 107 ++++- .../apache/qpid/protocol/AMQMethodListener.java | 3 +- .../protocol/AMQVersionAwareProtocolSession.java | 12 +- 18 files changed, 1816 insertions(+), 151 deletions(-) create mode 100644 java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java create mode 100644 java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java create mode 100644 java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java create mode 100644 java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java new file mode 100644 index 0000000000..bed80d5954 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java @@ -0,0 +1,528 @@ +package org.apache.mina.common; + +import org.apache.mina.common.ByteBuffer; + +import java.nio.*; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public class FixedSizeByteBufferAllocator implements ByteBufferAllocator +{ + + + private static final int MINIMUM_CAPACITY = 1; + + public FixedSizeByteBufferAllocator () + { + } + + public ByteBuffer allocate( int capacity, boolean direct ) + { + java.nio.ByteBuffer nioBuffer; + if( direct ) + { + nioBuffer = java.nio.ByteBuffer.allocateDirect( capacity ); + } + else + { + nioBuffer = java.nio.ByteBuffer.allocate( capacity ); + } + return new FixedSizeByteBuffer( nioBuffer ); + } + + public ByteBuffer wrap( java.nio.ByteBuffer nioBuffer ) + { + return new FixedSizeByteBuffer( nioBuffer ); + } + + public void dispose() + { + } + + + + private static final class FixedSizeByteBuffer extends ByteBuffer + { + private java.nio.ByteBuffer buf; + private int refCount = 1; + private int mark = -1; + + + protected FixedSizeByteBuffer( java.nio.ByteBuffer buf ) + { + this.buf = buf; + buf.order( ByteOrder.BIG_ENDIAN ); + refCount = 1; + } + + public synchronized void acquire() + { + if( refCount <= 0 ) + { + throw new IllegalStateException( "Already released buffer." ); + } + + refCount ++; + } + + public void release() + { + synchronized( this ) + { + if( refCount <= 0 ) + { + refCount = 0; + throw new IllegalStateException( + "Already released buffer. You released the buffer too many times." ); + } + + refCount --; + if( refCount > 0) + { + return; + } + } + } + + public java.nio.ByteBuffer buf() + { + return buf; + } + + public boolean isPooled() + { + return false; + } + + public void setPooled( boolean pooled ) + { + } + + public ByteBuffer duplicate() { + return new FixedSizeByteBuffer( this.buf.duplicate() ); + } + + public ByteBuffer slice() { + return new FixedSizeByteBuffer( this.buf.slice() ); + } + + public ByteBuffer asReadOnlyBuffer() { + return new FixedSizeByteBuffer( this.buf.asReadOnlyBuffer() ); + } + + public byte[] array() + { + return buf.array(); + } + + public int arrayOffset() + { + return buf.arrayOffset(); + } + + public boolean isDirect() + { + return buf.isDirect(); + } + + public boolean isReadOnly() + { + return buf.isReadOnly(); + } + + public int capacity() + { + return buf.capacity(); + } + + public ByteBuffer capacity( int newCapacity ) + { + if( newCapacity > capacity() ) + { + // Allocate a new buffer and transfer all settings to it. + int pos = position(); + int limit = limit(); + ByteOrder bo = order(); + + capacity0( newCapacity ); + buf.limit( limit ); + if( mark >= 0 ) + { + buf.position( mark ); + buf.mark(); + } + buf.position( pos ); + buf.order( bo ); + } + + return this; + } + + protected void capacity0( int requestedCapacity ) + { + int newCapacity = MINIMUM_CAPACITY; + while( newCapacity < requestedCapacity ) + { + newCapacity <<= 1; + } + + java.nio.ByteBuffer oldBuf = this.buf; + java.nio.ByteBuffer newBuf; + if( isDirect() ) + { + newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity ); + } + else + { + newBuf = java.nio.ByteBuffer.allocate( newCapacity ); + } + + newBuf.clear(); + oldBuf.clear(); + newBuf.put( oldBuf ); + this.buf = newBuf; + } + + + + public boolean isAutoExpand() + { + return false; + } + + public ByteBuffer setAutoExpand( boolean autoExpand ) + { + if(autoExpand) throw new IllegalArgumentException(); + else return this; + } + + public ByteBuffer expand( int pos, int expectedRemaining ) + { + int end = pos + expectedRemaining; + if( end > capacity() ) + { + // The buffer needs expansion. + capacity( end ); + } + + if( end > limit() ) + { + // We call limit() directly to prevent StackOverflowError + buf.limit( end ); + } + return this; + } + + public int position() + { + return buf.position(); + } + + public ByteBuffer position( int newPosition ) + { + + buf.position( newPosition ); + if( mark > newPosition ) + { + mark = -1; + } + return this; + } + + public int limit() + { + return buf.limit(); + } + + public ByteBuffer limit( int newLimit ) + { + buf.limit( newLimit ); + if( mark > newLimit ) + { + mark = -1; + } + return this; + } + + public ByteBuffer mark() + { + buf.mark(); + mark = position(); + return this; + } + + public int markValue() + { + return mark; + } + + public ByteBuffer reset() + { + buf.reset(); + return this; + } + + public ByteBuffer clear() + { + buf.clear(); + mark = -1; + return this; + } + + public ByteBuffer flip() + { + buf.flip(); + mark = -1; + return this; + } + + public ByteBuffer rewind() + { + buf.rewind(); + mark = -1; + return this; + } + + public byte get() + { + return buf.get(); + } + + public ByteBuffer put( byte b ) + { + buf.put( b ); + return this; + } + + public byte get( int index ) + { + return buf.get( index ); + } + + public ByteBuffer put( int index, byte b ) + { + buf.put( index, b ); + return this; + } + + public ByteBuffer get( byte[] dst, int offset, int length ) + { + buf.get( dst, offset, length ); + return this; + } + + public ByteBuffer put( java.nio.ByteBuffer src ) + { + buf.put( src ); + return this; + } + + public ByteBuffer put( byte[] src, int offset, int length ) + { + buf.put( src, offset, length ); + return this; + } + + public ByteBuffer compact() + { + buf.compact(); + mark = -1; + return this; + } + + public ByteOrder order() + { + return buf.order(); + } + + public ByteBuffer order( ByteOrder bo ) + { + buf.order( bo ); + return this; + } + + public char getChar() + { + return buf.getChar(); + } + + public ByteBuffer putChar( char value ) + { + buf.putChar( value ); + return this; + } + + public char getChar( int index ) + { + return buf.getChar( index ); + } + + public ByteBuffer putChar( int index, char value ) + { + buf.putChar( index, value ); + return this; + } + + public CharBuffer asCharBuffer() + { + return buf.asCharBuffer(); + } + + public short getShort() + { + return buf.getShort(); + } + + public ByteBuffer putShort( short value ) + { + buf.putShort( value ); + return this; + } + + public short getShort( int index ) + { + return buf.getShort( index ); + } + + public ByteBuffer putShort( int index, short value ) + { + buf.putShort( index, value ); + return this; + } + + public ShortBuffer asShortBuffer() + { + return buf.asShortBuffer(); + } + + public int getInt() + { + return buf.getInt(); + } + + public ByteBuffer putInt( int value ) + { + buf.putInt( value ); + return this; + } + + public int getInt( int index ) + { + return buf.getInt( index ); + } + + public ByteBuffer putInt( int index, int value ) + { + buf.putInt( index, value ); + return this; + } + + public IntBuffer asIntBuffer() + { + return buf.asIntBuffer(); + } + + public long getLong() + { + return buf.getLong(); + } + + public ByteBuffer putLong( long value ) + { + buf.putLong( value ); + return this; + } + + public long getLong( int index ) + { + return buf.getLong( index ); + } + + public ByteBuffer putLong( int index, long value ) + { + buf.putLong( index, value ); + return this; + } + + public LongBuffer asLongBuffer() + { + return buf.asLongBuffer(); + } + + public float getFloat() + { + return buf.getFloat(); + } + + public ByteBuffer putFloat( float value ) + { + buf.putFloat( value ); + return this; + } + + public float getFloat( int index ) + { + return buf.getFloat( index ); + } + + public ByteBuffer putFloat( int index, float value ) + { + buf.putFloat( index, value ); + return this; + } + + public FloatBuffer asFloatBuffer() + { + return buf.asFloatBuffer(); + } + + public double getDouble() + { + return buf.getDouble(); + } + + public ByteBuffer putDouble( double value ) + { + buf.putDouble( value ); + return this; + } + + public double getDouble( int index ) + { + return buf.getDouble( index ); + } + + public ByteBuffer putDouble( int index, double value ) + { + buf.putDouble( index, value ); + return this; + } + + public DoubleBuffer asDoubleBuffer() + { + return buf.asDoubleBuffer(); + } + + + } + + +} diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java new file mode 100644 index 0000000000..09cf785108 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java @@ -0,0 +1,250 @@ +package org.apache.mina.common.support; + +import org.apache.mina.common.IoFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFutureListener; + +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * A default implementation of {@link org.apache.mina.common.IoFuture}. + * + * @author The Apache Directory Project (mina-dev@directory.apache.org) + * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (í™”, 05 9ì›” 2006) $ + */ +public class DefaultIoFuture implements IoFuture +{ + private final IoSession session; + private final Object lock; + private List listeners; + private Object result; + private boolean ready; + + + /** + * Creates a new instance. + * + * @param session an {@link IoSession} which is associated with this future + */ + public DefaultIoFuture( IoSession session ) + { + this.session = session; + this.lock = this; + } + + /** + * Creates a new instance which uses the specified object as a lock. + */ + public DefaultIoFuture( IoSession session, Object lock ) + { + if( lock == null ) + { + throw new NullPointerException( "lock" ); + } + this.session = session; + this.lock = lock; + } + + public IoSession getSession() + { + return session; + } + + public Object getLock() + { + return lock; + } + + public void join() + { + synchronized( lock ) + { + while( !ready ) + { + try + { + lock.wait(); + } + catch( InterruptedException e ) + { + } + } + } + } + + public boolean join( long timeoutInMillis ) + { + long startTime = ( timeoutInMillis <= 0 ) ? 0 : System + .currentTimeMillis(); + long waitTime = timeoutInMillis; + + synchronized( lock ) + { + if( ready ) + { + return ready; + } + else if( waitTime <= 0 ) + { + return ready; + } + + for( ;; ) + { + try + { + lock.wait( waitTime ); + } + catch( InterruptedException e ) + { + } + + if( ready ) + return true; + else + { + waitTime = timeoutInMillis - ( System.currentTimeMillis() - startTime ); + if( waitTime <= 0 ) + { + return ready; + } + } + } + } + } + + public boolean isReady() + { + synchronized( lock ) + { + return ready; + } + } + + /** + * Sets the result of the asynchronous operation, and mark it as finished. + */ + protected void setValue( Object newValue ) + { + synchronized( lock ) + { + // Allow only once. + if( ready ) + { + return; + } + + result = newValue; + ready = true; + lock.notifyAll(); + + notifyListeners(); + } + } + + /** + * Returns the result of the asynchronous operation. + */ + protected Object getValue() + { + synchronized( lock ) + { + return result; + } + } + + public void addListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + if(listeners == null) + { + listeners = new ArrayList(); + } + listeners.add( listener ); + if( ready ) + { + listener.operationComplete( this ); + } + } + } + + public void removeListener( IoFutureListener listener ) + { + if( listener == null ) + { + throw new NullPointerException( "listener" ); + } + + synchronized( lock ) + { + listeners.remove( listener ); + } + } + + private void notifyListeners() + { + synchronized( lock ) + { + + if(listeners != null) + { + + for( Iterator i = listeners.iterator(); i.hasNext(); ) { + ( ( IoFutureListener ) i.next() ).operationComplete( this ); + } + } + } + } +} + + + diff --git a/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java new file mode 100644 index 0000000000..b8c6f29720 --- /dev/null +++ b/java/common/src/main/java/org/apache/mina/filter/codec/QpidProtocolCodecFilter.java @@ -0,0 +1,440 @@ +package org.apache.mina.filter.codec; + + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +import org.apache.mina.common.*; +import org.apache.mina.common.support.DefaultWriteFuture; +import org.apache.mina.filter.codec.support.SimpleProtocolDecoderOutput; +import org.apache.mina.util.SessionLog; +import org.apache.mina.util.Queue; + + +public class QpidProtocolCodecFilter extends IoFilterAdapter +{ + public static final String ENCODER = QpidProtocolCodecFilter.class.getName() + ".encoder"; + public static final String DECODER = QpidProtocolCodecFilter.class.getName() + ".decoder"; + + private static final Class[] EMPTY_PARAMS = new Class[0]; + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.wrap( new byte[0] ); + + private final ProtocolCodecFactory factory; + + public QpidProtocolCodecFilter( ProtocolCodecFactory factory ) + { + if( factory == null ) + { + throw new NullPointerException( "factory" ); + } + this.factory = factory; + } + + public QpidProtocolCodecFilter( final ProtocolEncoder encoder, final ProtocolDecoder decoder ) + { + if( encoder == null ) + { + throw new NullPointerException( "encoder" ); + } + if( decoder == null ) + { + throw new NullPointerException( "decoder" ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() + { + return encoder; + } + + public ProtocolDecoder getDecoder() + { + return decoder; + } + }; + } + + public QpidProtocolCodecFilter( final Class encoderClass, final Class decoderClass ) + { + if( encoderClass == null ) + { + throw new NullPointerException( "encoderClass" ); + } + if( decoderClass == null ) + { + throw new NullPointerException( "decoderClass" ); + } + if( !ProtocolEncoder.class.isAssignableFrom( encoderClass ) ) + { + throw new IllegalArgumentException( "encoderClass: " + encoderClass.getName() ); + } + if( !ProtocolDecoder.class.isAssignableFrom( decoderClass ) ) + { + throw new IllegalArgumentException( "decoderClass: " + decoderClass.getName() ); + } + try + { + encoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor." ); + } + try + { + decoderClass.getConstructor( EMPTY_PARAMS ); + } + catch( NoSuchMethodException e ) + { + throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor." ); + } + + this.factory = new ProtocolCodecFactory() + { + public ProtocolEncoder getEncoder() throws Exception + { + return ( ProtocolEncoder ) encoderClass.newInstance(); + } + + public ProtocolDecoder getDecoder() throws Exception + { + return ( ProtocolDecoder ) decoderClass.newInstance(); + } + }; + } + + public void onPreAdd( IoFilterChain parent, String name, IoFilter.NextFilter nextFilter ) throws Exception + { + if( parent.contains( ProtocolCodecFilter.class ) ) + { + throw new IllegalStateException( "A filter chain cannot contain more than one QpidProtocolCodecFilter." ); + } + } + + public void messageReceived( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( !( message instanceof ByteBuffer ) ) + { + nextFilter.messageReceived( session, message ); + return; + } + + ByteBuffer in = ( ByteBuffer ) message; + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + + try + { + decoder.decode( session, in, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + pde.setHexdump( in.getHexDump() ); + throw pde; + } + finally + { + // Dispose the decoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeDecoder( session ); + } + + // Release the read buffer. + in.release(); + + decoderOut.flush(); + } + } + + public void messageSent( IoFilter.NextFilter nextFilter, IoSession session, Object message ) throws Exception + { + if( message instanceof HiddenByteBuffer ) + { + return; + } + + if( !( message instanceof MessageByteBuffer ) ) + { + nextFilter.messageSent( session, message ); + return; + } + + nextFilter.messageSent( session, ( ( MessageByteBuffer ) message ).message ); + } + + public void filterWrite( IoFilter.NextFilter nextFilter, IoSession session, IoFilter.WriteRequest writeRequest ) throws Exception + { + Object message = writeRequest.getMessage(); + if( message instanceof ByteBuffer ) + { + nextFilter.filterWrite( session, writeRequest ); + return; + } + + ProtocolEncoder encoder = getEncoder( session ); + ProtocolEncoderOutputImpl encoderOut = getEncoderOut( session, nextFilter, writeRequest ); + + try + { + encoder.encode( session, message, encoderOut ); + encoderOut.flush(); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + new MessageByteBuffer( writeRequest.getMessage() ), + writeRequest.getFuture(), writeRequest.getDestination() ) ); + } + catch( Throwable t ) + { + ProtocolEncoderException pee; + if( t instanceof ProtocolEncoderException ) + { + pee = ( ProtocolEncoderException ) t; + } + else + { + pee = new ProtocolEncoderException( t ); + } + throw pee; + } + finally + { + // Dispose the encoder if this session is connectionless. + if( session.getTransportType().isConnectionless() ) + { + disposeEncoder( session ); + } + } + } + + public void sessionClosed( IoFilter.NextFilter nextFilter, IoSession session ) throws Exception + { + // Call finishDecode() first when a connection is closed. + ProtocolDecoder decoder = getDecoder( session ); + ProtocolDecoderOutput decoderOut = getDecoderOut( session, nextFilter ); + try + { + decoder.finishDecode( session, decoderOut ); + } + catch( Throwable t ) + { + ProtocolDecoderException pde; + if( t instanceof ProtocolDecoderException ) + { + pde = ( ProtocolDecoderException ) t; + } + else + { + pde = new ProtocolDecoderException( t ); + } + throw pde; + } + finally + { + // Dispose all. + disposeEncoder( session ); + disposeDecoder( session ); + + decoderOut.flush(); + } + + nextFilter.sessionClosed( session ); + } + + private ProtocolEncoder getEncoder( IoSession session ) throws Exception + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.getAttribute( ENCODER ); + if( encoder == null ) + { + encoder = factory.getEncoder(); + session.setAttribute( ENCODER, encoder ); + } + return encoder; + } + + private ProtocolEncoderOutputImpl getEncoderOut( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + return new ProtocolEncoderOutputImpl( session, nextFilter, writeRequest ); + } + + private ProtocolDecoder getDecoder( IoSession session ) throws Exception + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.getAttribute( DECODER ); + if( decoder == null ) + { + decoder = factory.getDecoder(); + session.setAttribute( DECODER, decoder ); + } + return decoder; + } + + private ProtocolDecoderOutput getDecoderOut( IoSession session, IoFilter.NextFilter nextFilter ) + { + return new SimpleProtocolDecoderOutput( session, nextFilter ); + } + + private void disposeEncoder( IoSession session ) + { + ProtocolEncoder encoder = ( ProtocolEncoder ) session.removeAttribute( ENCODER ); + if( encoder == null ) + { + return; + } + + try + { + encoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Failed to dispose: " + encoder.getClass().getName() + + " (" + encoder + ')' ); + } + } + + private void disposeDecoder( IoSession session ) + { + ProtocolDecoder decoder = ( ProtocolDecoder ) session.removeAttribute( DECODER ); + if( decoder == null ) + { + return; + } + + try + { + decoder.dispose( session ); + } + catch( Throwable t ) + { + SessionLog.warn( + session, + "Falied to dispose: " + decoder.getClass().getName() + + " (" + decoder + ')' ); + } + } + + private static class HiddenByteBuffer extends ByteBufferProxy + { + private HiddenByteBuffer( ByteBuffer buf ) + { + super( buf ); + } + } + + private static class MessageByteBuffer extends ByteBufferProxy + { + private final Object message; + + private MessageByteBuffer( Object message ) + { + super( EMPTY_BUFFER ); + this.message = message; + } + + public void acquire() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + + public void release() + { + // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message + } + } + + private static class ProtocolEncoderOutputImpl implements ProtocolEncoderOutput + { + private ByteBuffer buffer; + + private final IoSession session; + private final IoFilter.NextFilter nextFilter; + private final IoFilter.WriteRequest writeRequest; + + public ProtocolEncoderOutputImpl( IoSession session, IoFilter.NextFilter nextFilter, IoFilter.WriteRequest writeRequest ) + { + this.session = session; + this.nextFilter = nextFilter; + this.writeRequest = writeRequest; + } + + + + public void write( ByteBuffer buf ) + { + if(buffer != null) + { + flush(); + } + buffer = buf; + } + + public void mergeAll() + { + } + + public WriteFuture flush() + { + WriteFuture future = null; + if( buffer == null ) + { + return null; + } + else + { + ByteBuffer buf = buffer; + // Flush only when the buffer has remaining. + if( buf.hasRemaining() ) + { + future = doFlush( buf ); + } + + } + + return future; + } + + + protected WriteFuture doFlush( ByteBuffer buf ) + { + WriteFuture future = new DefaultWriteFuture( session ); + nextFilter.filterWrite( + session, + new IoFilter.WriteRequest( + buf, + future, writeRequest.getDestination() ) ); + return future; + } + } +} + diff --git a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java index ff0bc798da..7eef73f337 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java +++ b/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java @@ -22,6 +22,7 @@ package org.apache.qpid.codec; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; @@ -48,6 +49,9 @@ import org.apache.qpid.framing.ProtocolInitiation; */ public class AMQDecoder extends CumulativeProtocolDecoder { + + private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer"; + /** Holds the 'normal' AMQP data decoder. */ private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder(); @@ -171,4 +175,97 @@ public class AMQDecoder extends CumulativeProtocolDecoder { _expectProtocolInitiation = expectProtocolInitiation; } + + + /** + * Cumulates content of in into internal buffer and forwards + * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}. + * doDecode() is invoked repeatedly until it returns false + * and the cumulative buffer is compacted after decoding ends. + * + * @throws IllegalStateException if your doDecode() returned + * true not consuming the cumulative buffer. + */ + public void decode( IoSession session, ByteBuffer in, + ProtocolDecoderOutput out ) throws Exception + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + // if we have a session buffer, append data to that otherwise + // use the buffer read from the network directly + if( buf != null ) + { + buf.put( in ); + buf.flip(); + } + else + { + buf = in; + } + + for( ;; ) + { + int oldPos = buf.position(); + boolean decoded = doDecode( session, buf, out ); + if( decoded ) + { + if( buf.position() == oldPos ) + { + throw new IllegalStateException( + "doDecode() can't return true when buffer is not consumed." ); + } + + if( !buf.hasRemaining() ) + { + break; + } + } + else + { + break; + } + } + + // if there is any data left that cannot be decoded, we store + // it in a buffer in the session and next time this decoder is + // invoked the session buffer gets appended to + if ( buf.hasRemaining() ) + { + storeRemainingInSession( buf, session ); + } + else + { + removeSessionBuffer( session ); + } + } + + /** + * Releases the cumulative buffer used by the specified session. + * Please don't forget to call super.dispose( session ) when + * you override this method. + */ + public void dispose( IoSession session ) throws Exception + { + removeSessionBuffer( session ); + } + + private void removeSessionBuffer(IoSession session) + { + ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER ); + if( buf != null ) + { + buf.release(); + session.removeAttribute( BUFFER ); + } + } + + private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator(); + + private void storeRemainingInSession(ByteBuffer buf, IoSession session) + { + ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false ); + remainingBuf.setAutoExpand( true ); + remainingBuf.put( buf ); + session.setAttribute( BUFFER, remainingBuf ); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index 3abd97ddb7..fcd336b180 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public interface AMQBody { @@ -36,4 +38,6 @@ public interface AMQBody //public void populateFromBuffer(ByteBuffer buffer, long size) // throws AMQFrameDecodingException, AMQProtocolVersionException; + + void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException; } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java index 11f505fd4b..02a46f3748 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java @@ -27,7 +27,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock private final int _channel; private final AMQBody _bodyFrame; - + public static final byte FRAME_END_BYTE = (byte) 0xCE; public AMQFrame(final int channel, final AMQBody bodyFrame) @@ -47,13 +47,19 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return 1 + 2 + 4 + _bodyFrame.getSize() + 1; } + public static final int getFrameOverhead() + { + return 1 + 2 + 4 + 1; + } + + public void writePayload(ByteBuffer buffer) { buffer.put(_bodyFrame.getFrameType()); EncodingUtils.writeUnsignedShort(buffer, _channel); EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize()); _bodyFrame.writePayload(buffer); - buffer.put((byte) 0xCE); + buffer.put(FRAME_END_BYTE); } public final int getChannel() @@ -66,10 +72,54 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock return _bodyFrame; } - - public String toString() { return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame); } + + public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body) + { + buffer.put(body.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body.getSize()); + body.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + + public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) + { + buffer.put(body1.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body1.getSize()); + body1.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body2.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body2.getSize()); + body2.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + buffer.put(body3.getFrameType()); + EncodingUtils.writeUnsignedShort(buffer, channel); + EncodingUtils.writeUnsignedInteger(buffer, body3.getSize()); + body3.writePayload(buffer); + buffer.put(FRAME_END_BYTE); + + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java index 5215bcbd66..64af717342 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java @@ -24,7 +24,9 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; public abstract class AMQMethodBodyImpl implements AMQMethodBody { @@ -86,4 +88,9 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause); } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException + { + session.methodFrameReceived(channelId, this); + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index cf64b0475a..07e2faaf7e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -26,8 +26,7 @@ import org.apache.mina.common.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.WeakHashMap; +import java.util.*; import java.lang.ref.WeakReference; /** @@ -38,6 +37,62 @@ import java.lang.ref.WeakReference; */ public final class AMQShortString implements CharSequence, Comparable { + private static final byte MINUS = (byte)'-'; + private static final byte ZERO = (byte) '0'; + + + + private final class TokenizerImpl implements AMQShortStringTokenizer + { + private final byte _delim; + private int _count = -1; + private int _pos = 0; + + public TokenizerImpl(final byte delim) + { + _delim = delim; + } + + public int countTokens() + { + if(_count == -1) + { + _count = 1 + AMQShortString.this.occurences(_delim); + } + return _count; + } + + public AMQShortString nextToken() + { + if(_pos <= AMQShortString.this.length()) + { + int nextDelim = AMQShortString.this.indexOf(_delim, _pos); + if(nextDelim == -1) + { + nextDelim = AMQShortString.this.length(); + } + + AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++); + _pos = nextDelim; + return nextToken; + } + else + { + return null; + } + } + + public boolean hasMoreTokens() + { + return _pos <= AMQShortString.this.length(); + } + } + + private AMQShortString substring(final int from, final int to) + { + return new AMQShortString(_data, from, to); + } + private static final ThreadLocal>> _localInternMap = new ThreadLocal>>() @@ -53,7 +108,8 @@ public final class AMQShortString implements CharSequence, Comparable terms, + final AMQShortString delim) + { + if(terms.size() == 0) + { + return EMPTY_STRING; + } + + int size = delim.length() * (terms.size() - 1); + for(AMQShortString term : terms) + { + size += term.length(); + } + + byte[] data = new byte[size]; + int pos = 0; + final byte[] delimData = delim._data; + final int delimOffset = delim._offset; + final int delimLength = delim._length; + + + for(AMQShortString term : terms) + { + + if(pos!=0) + { + System.arraycopy(delimData, delimOffset,data,pos, delimLength); + pos+=delimLength; + } + System.arraycopy(term._data,term._offset,data,pos,term._length); + pos+=term._length; + } + + + + return new AMQShortString(data,0,size); + } + + public int toIntValue() + { + int pos = 0; + int val = 0; + + + boolean isNegative = (_data[pos] == MINUS); + if(isNegative) + { + pos++; + } + while(pos < _length) + { + int digit = (int) (_data[pos++] - ZERO); + if((digit < 0) || (digit > 9)) + { + throw new NumberFormatException("\""+toString()+"\" is not a valid number"); + } + val = val * 10; + val += digit; + } + if(isNegative) + { + val = val * -1; + } + return val; + } + + public boolean contains(final byte b) + { + for(int i = 0; i < _length; i++) + { + if(_data[i] == b) + { + return true; + } + } + return false; //To change body of created methods use File | Settings | File Templates. + } + } diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java new file mode 100644 index 0000000000..e2db8906a1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortStringTokenizer.java @@ -0,0 +1,31 @@ +package org.apache.qpid.framing; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +public interface AMQShortStringTokenizer +{ + + public int countTokens(); + + public AMQShortString nextToken(); + + boolean hasMoreTokens(); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java index 7b6699b783..94030f383e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java +++ b/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java @@ -24,7 +24,6 @@ import org.apache.mina.common.ByteBuffer; public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock { - private AMQDataBlock _firstFrame; private AMQDataBlock[] _blocks; @@ -33,27 +32,12 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD _blocks = blocks; } - /** - * The encoded block will be logically first before the AMQDataBlocks which are encoded - * into the buffer afterwards. - * @param encodedBlock already-encoded data - * @param blocks some blocks to be encoded. - */ - public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks) - { - this(blocks); - _firstFrame = encodedBlock; - } public AMQDataBlock[] getBlocks() { return _blocks; } - public AMQDataBlock getFirstFrame() - { - return _firstFrame; - } public long getSize() { @@ -62,19 +46,11 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD { frameSize += _blocks[i].getSize(); } - if (_firstFrame != null) - { - frameSize += _firstFrame.getSize(); - } return frameSize; } public void writePayload(ByteBuffer buffer) { - if (_firstFrame != null) - { - _firstFrame.writePayload(buffer); - } for (int i = 0; i < _blocks.length; i++) { _blocks[i].writePayload(buffer); @@ -90,7 +66,7 @@ public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQD else { StringBuilder buf = new StringBuilder(this.getClass().getName()); - buf.append("{encodedBlock=").append(_firstFrame); + buf.append("{"); for (int i = 0 ; i < _blocks.length; i++) { buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]"); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java index cbee1680f7..969df954ce 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public class ContentBody implements AMQBody { @@ -68,6 +70,12 @@ public class ContentBody implements AMQBody } } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentBodyReceived(channelId, this); + } + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException { if (size > 0) diff --git a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java index 80a61544b3..83e5a7e341 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public class ContentHeaderBody implements AMQBody { @@ -110,6 +112,12 @@ public class ContentHeaderBody implements AMQBody properties.writePropertyListPayload(buffer); } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.contentHeaderReceived(channelId, this); + } + public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties, long bodySize) { diff --git a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java index ef7163bd40..15a43345b5 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java @@ -21,6 +21,8 @@ package org.apache.qpid.framing; import org.apache.mina.common.ByteBuffer; +import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.AMQException; public class HeartbeatBody implements AMQBody { @@ -55,6 +57,12 @@ public class HeartbeatBody implements AMQBody { } + public void handle(final int channelId, final AMQVersionAwareProtocolSession session) + throws AMQException + { + session.heartbeatBodyReceived(channelId, this); + } + protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException { if(size > 0) diff --git a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java index d8b6b25b92..d7194640d4 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java +++ b/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java @@ -70,7 +70,7 @@ public class MethodConverter_0_9 extends AbstractMethodConverter implements Prot return new MethodConverter_0_9.MessagePublishInfoImpl(exchange, publishBody.getImmediate(), publishBody.getMandatory(), - routingKey == null ? null : routingKey.intern()); + routingKey); } diff --git a/java/common/src/main/java/org/apache/qpid/pool/Job.java b/java/common/src/main/java/org/apache/qpid/pool/Job.java index ba3c5d03fa..b2a09ac592 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/Job.java +++ b/java/common/src/main/java/org/apache/qpid/pool/Job.java @@ -94,21 +94,23 @@ public class Job implements Runnable /** * Sequentially processes, up to the maximum number per job, the aggregated continuations in enqueued in this job. */ - void processAll() + boolean processAll() { // limit the number of events processed in one run - for (int i = 0; i < _maxEvents; i++) + int i = _maxEvents; + while( --i != 0 ) { Event e = _eventQueue.poll(); if (e == null) { - break; + return true; } else { e.process(_session); } } + return false; } /** @@ -144,9 +146,15 @@ public class Job implements Runnable */ public void run() { - processAll(); - deactivate(); - _completionHandler.completed(_session, this); + if(processAll()) + { + deactivate(); + _completionHandler.completed(_session, this); + } + else + { + _completionHandler.notCompleted(_session, this); + } } /** @@ -158,5 +166,7 @@ public class Job implements Runnable static interface JobCompletionHandler { public void completed(IoSession session, Job job); + + public void notCompleted(final IoSession session, final Job job); } } diff --git a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java index cbe08a192e..8352b5af77 100644 --- a/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java +++ b/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java @@ -29,8 +29,8 @@ import org.apache.qpid.pool.Event.CloseEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ExecutorService; /** * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it @@ -84,9 +84,6 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo /** Used for debugging purposes. */ private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class); - /** Holds a mapping from Mina sessions to batched jobs for execution. */ - private final ConcurrentMap _jobs = new ConcurrentHashMap(); - /** Holds the managed reference to obtain the executor for the batched jobs. */ private final ReferenceCountingExecutorService _poolReference; @@ -94,7 +91,9 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo private final String _name; /** Defines the maximum number of events that will be batched into a single job. */ - private final int _maxEvents = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10); + + private final int _maxEvents; /** * Creates a named pooling filter, on the specified shared thread pool. @@ -102,10 +101,11 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * @param refCountingPool The thread pool reference. * @param name The identifying name of the filter type. */ - public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) + public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents) { _poolReference = refCountingPool; _name = name; + _maxEvents = maxEvents; } /** @@ -160,20 +160,34 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo /** * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. * - * @param session The Mina session to work in. + * @param job The job. * @param event The event to hand off asynchronously. */ - void fireAsynchEvent(IoSession session, Event event) + void fireAsynchEvent(Job job, Event event) { - Job job = getJobForSession(session); + // job.acquire(); //prevents this job being removed from _jobs job.add(event); - // Additional checks on pool to check that it hasn't shutdown. - // The alternative is to catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) { - _poolReference.getPool().execute(job); + return; + } + + // rather than perform additional checks on pool to check that it hasn't shutdown. + // catch the RejectedExecutionException that will result from executing on a shutdown pool + if (job.activate()) + { + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } } } @@ -186,7 +200,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void createNewJobForSession(IoSession session) { - Job job = new Job(session, this, _maxEvents); + Job job = new Job(session, this, MAX_JOB_EVENTS); session.setAttribute(_name, job); } @@ -197,7 +211,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo * * @return The Job for this filter to place asynchronous events into. */ - private Job getJobForSession(IoSession session) + public Job getJobForSession(IoSession session) { return (Job) session.getAttribute(_name); } @@ -233,17 +247,57 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo // } // } // else + + if (!job.isComplete()) { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + // ritchiem : 2006-12-13 Do we need to perform the additional checks here? // Can the pool be shutdown at this point? - if (job.activate() && (_poolReference.getPool() != null) && !_poolReference.getPool().isShutdown()) + if (job.activate()) { - _poolReference.getPool().execute(job); + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + } } } + public void notCompleted(IoSession session, Job job) + { + final ExecutorService pool = _poolReference.getPool(); + + if(pool == null) + { + return; + } + + try + { + pool.execute(job); + } + catch(RejectedExecutionException e) + { + _logger.warn("Thread pool shutdown while tasks still outstanding"); + } + + } + + + /** * No-op pass through filter to the next filter in the chain. * @@ -400,7 +454,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS)); } /** @@ -412,8 +466,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void messageReceived(NextFilter nextFilter, final IoSession session, Object message) { - - fireAsynchEvent(session, new Event.ReceivedEvent(nextFilter, message)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message)); } /** @@ -424,7 +478,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { - fireAsynchEvent(session, new CloseEvent(nextFilter)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new CloseEvent(nextFilter)); } } @@ -442,7 +497,7 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name) { - super(refCountingPool, name); + super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS)); } /** @@ -454,7 +509,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest) { - fireAsynchEvent(session, new Event.WriteEvent(nextFilter, writeRequest)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest)); } /** @@ -465,7 +521,8 @@ public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCo */ public void sessionClosed(final NextFilter nextFilter, final IoSession session) { - fireAsynchEvent(session, new CloseEvent(nextFilter)); + Job job = getJobForSession(session); + fireAsynchEvent(job, new CloseEvent(nextFilter)); } } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 2fbeeda1d4..5a7679a972 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -21,6 +21,7 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.AMQException; /** * AMQMethodListener is a listener that receives notifications of AMQP methods. The methods are packaged as events in @@ -57,7 +58,7 @@ public interface AMQMethodListener * * @todo Consider narrowing the exception. */ - boolean methodReceived(AMQMethodEvent evt) throws Exception; + boolean methodReceived(AMQMethodEvent evt) throws AMQException; /** * Notifies the listener of an error on the event context to which it is listening. The listener should perform diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 035645aad2..b56a05f725 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.framing.VersionSpecificRegistry; -import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.*; +import org.apache.qpid.AMQException; /** * AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to @@ -46,4 +46,12 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto // public VersionSpecificRegistry getRegistry(); MethodRegistry getMethodRegistry(); + + + public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException; + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException; + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException; + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; + + } -- cgit v1.2.1 From 90b7512b4814a0efd6fd5567d6d2a21c5c14ac0b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 22 Feb 2008 16:15:11 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630239 13f79535-47bb-0310-9956-ffa450edef68 --- .../mina/common/support/DefaultIoFuture.java | 40 +++++----------------- .../main/java/org/apache/qpid/framing/AMQBody.java | 3 -- 2 files changed, 9 insertions(+), 34 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java index 09cf785108..c515263317 100644 --- a/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java +++ b/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java @@ -1,34 +1,3 @@ -package org.apache.mina.common.support; - -import org.apache.mina.common.IoFuture; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoFutureListener; - -import java.util.List; -import java.util.ArrayList; -import java.util.Iterator; - -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -48,6 +17,15 @@ import java.util.Iterator; * under the License. * */ +package org.apache.mina.common.support; + +import org.apache.mina.common.IoFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.IoFutureListener; + +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; /** * A default implementation of {@link org.apache.mina.common.IoFuture}. diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java index fcd336b180..fe04155bb8 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java @@ -36,8 +36,5 @@ public interface AMQBody public void writePayload(ByteBuffer buffer); - //public void populateFromBuffer(ByteBuffer buffer, long size) - // throws AMQFrameDecodingException, AMQProtocolVersionException; - void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException; } -- cgit v1.2.1 From 882a98c31e6ceb34e41c67562da9069dfb4cfda1 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 03:22:39 +0000 Subject: QPID-806 : Added startsWith and endsWith methods to AMQShortString, including test. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630723 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/framing/AMQShortString.java | 1312 +++++++++++--------- .../apache/qpid/framing/AMQShortStringTest.java | 62 + 2 files changed, 754 insertions(+), 620 deletions(-) create mode 100644 java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 07e2faaf7e..505c819bb2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -1,620 +1,692 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.framing; - -import org.apache.mina.common.ByteBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.lang.ref.WeakReference; - -/** - * A short string is a representation of an AMQ Short String - * Short strings differ from the Java String class by being limited to on ASCII characters (0-127) - * and thus can be held more effectively in a byte buffer. - * - */ -public final class AMQShortString implements CharSequence, Comparable -{ - private static final byte MINUS = (byte)'-'; - private static final byte ZERO = (byte) '0'; - - - - private final class TokenizerImpl implements AMQShortStringTokenizer - { - private final byte _delim; - private int _count = -1; - private int _pos = 0; - - public TokenizerImpl(final byte delim) - { - _delim = delim; - } - - public int countTokens() - { - if(_count == -1) - { - _count = 1 + AMQShortString.this.occurences(_delim); - } - return _count; - } - - public AMQShortString nextToken() - { - if(_pos <= AMQShortString.this.length()) - { - int nextDelim = AMQShortString.this.indexOf(_delim, _pos); - if(nextDelim == -1) - { - nextDelim = AMQShortString.this.length(); - } - - AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++); - _pos = nextDelim; - return nextToken; - } - else - { - return null; - } - } - - public boolean hasMoreTokens() - { - return _pos <= AMQShortString.this.length(); - } - } - - private AMQShortString substring(final int from, final int to) - { - return new AMQShortString(_data, from, to); - } - - - private static final ThreadLocal>> _localInternMap = - new ThreadLocal>>() - { - protected Map> initialValue() - { - return new WeakHashMap>(); - }; - }; - - private static final Map> _globalInternMap = - new WeakHashMap>(); - - private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class); - - private final byte[] _data; - private final int _offset; - private int _hashCode; - private final int _length; - private static final char[] EMPTY_CHAR_ARRAY = new char[0]; - - public static final AMQShortString EMPTY_STRING = new AMQShortString((String)null); - - public AMQShortString(byte[] data) - { - - _data = data.clone(); - _length = data.length; - _offset = 0; - } - - public AMQShortString(byte[] data, int pos) - { - final int size = data[pos++]; - final byte[] dataCopy = new byte[size]; - System.arraycopy(data,pos,dataCopy,0,size); - _length = size; - _data = dataCopy; - _offset = 0; - } - - public AMQShortString(String data) - { - this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray()); - - } - - public AMQShortString(char[] data) - { - if (data == null) - { - throw new NullPointerException("Cannot create AMQShortString with null char[]"); - } - - final int length = data.length; - final byte[] stringBytes = new byte[length]; - int hash = 0; - for (int i = 0; i < length; i++) - { - stringBytes[i] = (byte) (0xFF & data[i]); - hash = (31 * hash) + stringBytes[i]; - } - _hashCode = hash; - _data = stringBytes; - - _length = length; - _offset = 0; - - } - - public AMQShortString(CharSequence charSequence) - { - final int length = charSequence.length(); - final byte[] stringBytes = new byte[length]; - int hash = 0; - for (int i = 0; i < length; i++) - { - stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i))); - hash = (31 * hash) + stringBytes[i]; - - } - - _data = stringBytes; - _hashCode = hash; - _length = length; - _offset = 0; - - } - - private AMQShortString(ByteBuffer data, final int length) - { - byte[] dataBytes = new byte[length]; - data.get(dataBytes); - _data = dataBytes; - _length = length; - _offset = 0; - - } - - private AMQShortString(final byte[] data, final int from, final int to) - { - _offset = from; - _length = to - from; - _data = data; - } - - - /** - * Get the length of the short string - * @return length of the underlying byte array - */ - public int length() - { - return _length; - } - - public char charAt(int index) - { - - return (char) _data[_offset + index]; - - } - - public CharSequence subSequence(int start, int end) - { - return new CharSubSequence(start, end); - } - - public int writeToByteArray(byte[] encoding, int pos) - { - final int size = length(); - encoding[pos++] = (byte) size; - System.arraycopy(_data,_offset,encoding,pos,size); - return pos+size; - } - - public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos) - { - - - final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos); - if(shortString.length() == 0) - { - return null; - } - else - { - return shortString; - } - } - - public static AMQShortString readFromBuffer(ByteBuffer buffer) - { - final short length = buffer.getUnsigned(); - if (length == 0) - { - return null; - } - else - { - - return new AMQShortString(buffer, length); - } - } - - public byte[] getBytes() - { - if(_offset == 0 && _length == _data.length) - { - return _data.clone(); - } - else - { - byte[] data = new byte[_length]; - System.arraycopy(_data,_offset,data,0,_length); - return data; - } - } - - public void writeToBuffer(ByteBuffer buffer) - { - - final int size = length(); - //buffer.setAutoExpand(true); - buffer.put((byte) size); - buffer.put(_data, _offset, size); - - } - - private final class CharSubSequence implements CharSequence - { - private final int _sequenceOffset; - private final int _end; - - public CharSubSequence(final int offset, final int end) - { - _sequenceOffset = offset; - _end = end; - } - - public int length() - { - return _end - _sequenceOffset; - } - - public char charAt(int index) - { - return AMQShortString.this.charAt(index + _sequenceOffset); - } - - public CharSequence subSequence(int start, int end) - { - return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset); - } - } - - public char[] asChars() - { - final int size = length(); - final char[] chars = new char[size]; - - for (int i = 0; i < size; i++) - { - chars[i] = (char) _data[i + _offset]; - } - - return chars; - } - - public String asString() - { - return new String(asChars()); - } - - public boolean equals(Object o) - { - - - if(o instanceof AMQShortString) - { - return equals((AMQShortString)o); - } - if(o instanceof CharSequence) - { - return equals((CharSequence)o); - } - - if (o == null) - { - return false; - } - - if (o == this) - { - return true; - } - - - return false; - - } - - public boolean equals(final AMQShortString otherString) - { - if (otherString == this) - { - return true; - } - - if (otherString == null) - { - return false; - } - - if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode)) - { - return false; - } - - return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data)) - || Arrays.equals(getBytes(),otherString.getBytes()); - - } - - public boolean equals(CharSequence s) - { - if(s instanceof AMQShortString) - { - return equals((AMQShortString)s); - } - - if (s == null) - { - return false; - } - - if (s.length() != length()) - { - return false; - } - - for (int i = 0; i < length(); i++) - { - if (charAt(i) != s.charAt(i)) - { - return false; - } - } - - return true; - } - - public int hashCode() - { - int hash = _hashCode; - if (hash == 0) - { - final int size = length(); - - for (int i = 0; i < size; i++) - { - hash = (31 * hash) + _data[i+_offset]; - } - - _hashCode = hash; - } - - return hash; - } - - public void setDirty() - { - _hashCode = 0; - } - - public String toString() - { - return asString(); - } - - public int compareTo(AMQShortString name) - { - if (name == null) - { - return 1; - } - else - { - - if (name.length() < length()) - { - return -name.compareTo(this); - } - - for (int i = 0; i < length(); i++) - { - final byte d = _data[i+_offset]; - final byte n = name._data[i+name._offset]; - if (d < n) - { - return -1; - } - - if (d > n) - { - return 1; - } - } - - return (length() == name.length()) ? 0 : -1; - } - } - - - public AMQShortStringTokenizer tokenize(byte delim) - { - return new TokenizerImpl(delim); - } - - - public AMQShortString intern() - { - - hashCode(); - - Map> localMap = - _localInternMap.get(); - - WeakReference ref = localMap.get(this); - AMQShortString internString; - - if(ref != null) - { - internString = ref.get(); - if(internString != null) - { - return internString; - } - } - - - synchronized(_globalInternMap) - { - - ref = _globalInternMap.get(this); - if((ref == null) || ((internString = ref.get()) == null)) - { - internString = new AMQShortString(getBytes()); - ref = new WeakReference(internString); - _globalInternMap.put(internString, ref); - } - - } - localMap.put(internString, ref); - return internString; - - } - - private int occurences(final byte delim) - { - int count = 0; - final int end = _offset + _length; - for(int i = _offset ; i < end ; i++ ) - { - if(_data[i] == delim) - { - count++; - } - } - return count; - } - - private int indexOf(final byte val, final int pos) - { - - for(int i = pos; i < length(); i++) - { - if(_data[_offset+i] == val) - { - return i; - } - } - return -1; - } - - - public static AMQShortString join(final Collection terms, - final AMQShortString delim) - { - if(terms.size() == 0) - { - return EMPTY_STRING; - } - - int size = delim.length() * (terms.size() - 1); - for(AMQShortString term : terms) - { - size += term.length(); - } - - byte[] data = new byte[size]; - int pos = 0; - final byte[] delimData = delim._data; - final int delimOffset = delim._offset; - final int delimLength = delim._length; - - - for(AMQShortString term : terms) - { - - if(pos!=0) - { - System.arraycopy(delimData, delimOffset,data,pos, delimLength); - pos+=delimLength; - } - System.arraycopy(term._data,term._offset,data,pos,term._length); - pos+=term._length; - } - - - - return new AMQShortString(data,0,size); - } - - public int toIntValue() - { - int pos = 0; - int val = 0; - - - boolean isNegative = (_data[pos] == MINUS); - if(isNegative) - { - pos++; - } - while(pos < _length) - { - int digit = (int) (_data[pos++] - ZERO); - if((digit < 0) || (digit > 9)) - { - throw new NumberFormatException("\""+toString()+"\" is not a valid number"); - } - val = val * 10; - val += digit; - } - if(isNegative) - { - val = val * -1; - } - return val; - } - - public boolean contains(final byte b) - { - for(int i = 0; i < _length; i++) - { - if(_data[i] == b) - { - return true; - } - } - return false; //To change body of created methods use File | Settings | File Templates. - } - -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +import org.apache.mina.common.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.lang.ref.WeakReference; + +/** + * A short string is a representation of an AMQ Short String + * Short strings differ from the Java String class by being limited to on ASCII characters (0-127) + * and thus can be held more effectively in a byte buffer. + * + */ +public final class AMQShortString implements CharSequence, Comparable +{ + private static final byte MINUS = (byte)'-'; + private static final byte ZERO = (byte) '0'; + + + + private final class TokenizerImpl implements AMQShortStringTokenizer + { + private final byte _delim; + private int _count = -1; + private int _pos = 0; + + public TokenizerImpl(final byte delim) + { + _delim = delim; + } + + public int countTokens() + { + if(_count == -1) + { + _count = 1 + AMQShortString.this.occurences(_delim); + } + return _count; + } + + public AMQShortString nextToken() + { + if(_pos <= AMQShortString.this.length()) + { + int nextDelim = AMQShortString.this.indexOf(_delim, _pos); + if(nextDelim == -1) + { + nextDelim = AMQShortString.this.length(); + } + + AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++); + _pos = nextDelim; + return nextToken; + } + else + { + return null; + } + } + + public boolean hasMoreTokens() + { + return _pos <= AMQShortString.this.length(); + } + } + + private AMQShortString substring(final int from, final int to) + { + return new AMQShortString(_data, from, to); + } + + + private static final ThreadLocal>> _localInternMap = + new ThreadLocal>>() + { + protected Map> initialValue() + { + return new WeakHashMap>(); + }; + }; + + private static final Map> _globalInternMap = + new WeakHashMap>(); + + private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class); + + private final byte[] _data; + private final int _offset; + private int _hashCode; + private final int _length; + private static final char[] EMPTY_CHAR_ARRAY = new char[0]; + + public static final AMQShortString EMPTY_STRING = new AMQShortString((String)null); + + public AMQShortString(byte[] data) + { + + _data = data.clone(); + _length = data.length; + _offset = 0; + } + + public AMQShortString(byte[] data, int pos) + { + final int size = data[pos++]; + final byte[] dataCopy = new byte[size]; + System.arraycopy(data,pos,dataCopy,0,size); + _length = size; + _data = dataCopy; + _offset = 0; + } + + public AMQShortString(String data) + { + this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray()); + + } + + public AMQShortString(char[] data) + { + if (data == null) + { + throw new NullPointerException("Cannot create AMQShortString with null char[]"); + } + + final int length = data.length; + final byte[] stringBytes = new byte[length]; + int hash = 0; + for (int i = 0; i < length; i++) + { + stringBytes[i] = (byte) (0xFF & data[i]); + hash = (31 * hash) + stringBytes[i]; + } + _hashCode = hash; + _data = stringBytes; + + _length = length; + _offset = 0; + + } + + public AMQShortString(CharSequence charSequence) + { + final int length = charSequence.length(); + final byte[] stringBytes = new byte[length]; + int hash = 0; + for (int i = 0; i < length; i++) + { + stringBytes[i] = ((byte) (0xFF & charSequence.charAt(i))); + hash = (31 * hash) + stringBytes[i]; + + } + + _data = stringBytes; + _hashCode = hash; + _length = length; + _offset = 0; + + } + + private AMQShortString(ByteBuffer data, final int length) + { + byte[] dataBytes = new byte[length]; + data.get(dataBytes); + _data = dataBytes; + _length = length; + _offset = 0; + + } + + private AMQShortString(final byte[] data, final int from, final int to) + { + _offset = from; + _length = to - from; + _data = data; + } + + + /** + * Get the length of the short string + * @return length of the underlying byte array + */ + public int length() + { + return _length; + } + + public char charAt(int index) + { + + return (char) _data[_offset + index]; + + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start, end); + } + + public int writeToByteArray(byte[] encoding, int pos) + { + final int size = length(); + encoding[pos++] = (byte) size; + System.arraycopy(_data,_offset,encoding,pos,size); + return pos+size; + } + + public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos) + { + + + final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos); + if(shortString.length() == 0) + { + return null; + } + else + { + return shortString; + } + } + + public static AMQShortString readFromBuffer(ByteBuffer buffer) + { + final short length = buffer.getUnsigned(); + if (length == 0) + { + return null; + } + else + { + + return new AMQShortString(buffer, length); + } + } + + public byte[] getBytes() + { + if(_offset == 0 && _length == _data.length) + { + return _data.clone(); + } + else + { + byte[] data = new byte[_length]; + System.arraycopy(_data,_offset,data,0,_length); + return data; + } + } + + public void writeToBuffer(ByteBuffer buffer) + { + + final int size = length(); + //buffer.setAutoExpand(true); + buffer.put((byte) size); + buffer.put(_data, _offset, size); + + } + + public boolean endsWith(String s) + { + return endsWith(new AMQShortString(s)); + } + + + public boolean endsWith(AMQShortString otherString) + { + + if (otherString.length() > length()) + { + return false; + } + + + int thisLength = length(); + int otherLength = otherString.length(); + + for (int i = 1; i <= otherLength; i++) + { + if (charAt(thisLength - i) != otherString.charAt(otherLength - i)) + { + return false; + } + } + return true; + } + + public boolean startsWith(String s) + { + return startsWith(new AMQShortString(s)); + } + + public boolean startsWith(AMQShortString otherString) + { + + if (otherString.length() > length()) + { + return false; + } + + for (int i = 0; i < otherString.length(); i++) + { + if (_data[i] != otherString._data[i]) + { + return false; + } + } + + return true; + + } + + public boolean startsWith(CharSequence otherString) + { + if (otherString.length() > length()) + { + return false; + } + + for (int i = 0; i < otherString.length(); i++) + { + if (charAt(i) != otherString.charAt(i)) + { + return false; + } + } + + return true; + } + + + private final class CharSubSequence implements CharSequence + { + private final int _sequenceOffset; + private final int _end; + + public CharSubSequence(final int offset, final int end) + { + _sequenceOffset = offset; + _end = end; + } + + public int length() + { + return _end - _sequenceOffset; + } + + public char charAt(int index) + { + return AMQShortString.this.charAt(index + _sequenceOffset); + } + + public CharSequence subSequence(int start, int end) + { + return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset); + } + } + + public char[] asChars() + { + final int size = length(); + final char[] chars = new char[size]; + + for (int i = 0; i < size; i++) + { + chars[i] = (char) _data[i + _offset]; + } + + return chars; + } + + public String asString() + { + return new String(asChars()); + } + + public boolean equals(Object o) + { + + + if(o instanceof AMQShortString) + { + return equals((AMQShortString)o); + } + if(o instanceof CharSequence) + { + return equals((CharSequence)o); + } + + if (o == null) + { + return false; + } + + if (o == this) + { + return true; + } + + + return false; + + } + + public boolean equals(final AMQShortString otherString) + { + if (otherString == this) + { + return true; + } + + if (otherString == null) + { + return false; + } + + if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode)) + { + return false; + } + + return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data)) + || Arrays.equals(getBytes(),otherString.getBytes()); + + } + + public boolean equals(CharSequence s) + { + if(s instanceof AMQShortString) + { + return equals((AMQShortString)s); + } + + if (s == null) + { + return false; + } + + if (s.length() != length()) + { + return false; + } + + for (int i = 0; i < length(); i++) + { + if (charAt(i) != s.charAt(i)) + { + return false; + } + } + + return true; + } + + public int hashCode() + { + int hash = _hashCode; + if (hash == 0) + { + final int size = length(); + + for (int i = 0; i < size; i++) + { + hash = (31 * hash) + _data[i+_offset]; + } + + _hashCode = hash; + } + + return hash; + } + + public void setDirty() + { + _hashCode = 0; + } + + public String toString() + { + return asString(); + } + + public int compareTo(AMQShortString name) + { + if (name == null) + { + return 1; + } + else + { + + if (name.length() < length()) + { + return -name.compareTo(this); + } + + for (int i = 0; i < length(); i++) + { + final byte d = _data[i+_offset]; + final byte n = name._data[i+name._offset]; + if (d < n) + { + return -1; + } + + if (d > n) + { + return 1; + } + } + + return (length() == name.length()) ? 0 : -1; + } + } + + + public AMQShortStringTokenizer tokenize(byte delim) + { + return new TokenizerImpl(delim); + } + + + public AMQShortString intern() + { + + hashCode(); + + Map> localMap = + _localInternMap.get(); + + WeakReference ref = localMap.get(this); + AMQShortString internString; + + if(ref != null) + { + internString = ref.get(); + if(internString != null) + { + return internString; + } + } + + + synchronized(_globalInternMap) + { + + ref = _globalInternMap.get(this); + if((ref == null) || ((internString = ref.get()) == null)) + { + internString = new AMQShortString(getBytes()); + ref = new WeakReference(internString); + _globalInternMap.put(internString, ref); + } + + } + localMap.put(internString, ref); + return internString; + + } + + private int occurences(final byte delim) + { + int count = 0; + final int end = _offset + _length; + for(int i = _offset ; i < end ; i++ ) + { + if(_data[i] == delim) + { + count++; + } + } + return count; + } + + private int indexOf(final byte val, final int pos) + { + + for(int i = pos; i < length(); i++) + { + if(_data[_offset+i] == val) + { + return i; + } + } + return -1; + } + + + public static AMQShortString join(final Collection terms, + final AMQShortString delim) + { + if(terms.size() == 0) + { + return EMPTY_STRING; + } + + int size = delim.length() * (terms.size() - 1); + for(AMQShortString term : terms) + { + size += term.length(); + } + + byte[] data = new byte[size]; + int pos = 0; + final byte[] delimData = delim._data; + final int delimOffset = delim._offset; + final int delimLength = delim._length; + + + for(AMQShortString term : terms) + { + + if(pos!=0) + { + System.arraycopy(delimData, delimOffset,data,pos, delimLength); + pos+=delimLength; + } + System.arraycopy(term._data,term._offset,data,pos,term._length); + pos+=term._length; + } + + + + return new AMQShortString(data,0,size); + } + + public int toIntValue() + { + int pos = 0; + int val = 0; + + + boolean isNegative = (_data[pos] == MINUS); + if(isNegative) + { + pos++; + } + while(pos < _length) + { + int digit = (int) (_data[pos++] - ZERO); + if((digit < 0) || (digit > 9)) + { + throw new NumberFormatException("\""+toString()+"\" is not a valid number"); + } + val = val * 10; + val += digit; + } + if(isNegative) + { + val = val * -1; + } + return val; + } + + public boolean contains(final byte b) + { + for(int i = 0; i < _length; i++) + { + if(_data[i] == b) + { + return true; + } + } + return false; //To change body of created methods use File | Settings | File Templates. + } + +} diff --git a/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java new file mode 100644 index 0000000000..0ea2c8b9c1 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/framing/AMQShortStringTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.framing; + +import junit.framework.TestCase; +public class AMQShortStringTest extends TestCase +{ + + AMQShortString Hello = new AMQShortString("Hello"); + AMQShortString Hell = new AMQShortString("Hell"); + AMQShortString Goodbye = new AMQShortString("Goodbye"); + AMQShortString Good = new AMQShortString("Good"); + AMQShortString Bye = new AMQShortString("Bye"); + + public void testStartsWith() + { + assertTrue(Hello.startsWith(Hell)); + + assertFalse(Hell.startsWith(Hello)); + + assertTrue(Goodbye.startsWith(Good)); + + assertFalse(Good.startsWith(Goodbye)); + } + + public void testEndWith() + { + assertFalse(Hell.endsWith(Hello)); + + assertTrue(Goodbye.endsWith(new AMQShortString("bye"))); + + assertFalse(Goodbye.endsWith(Bye)); + } + + + public void testEquals() + { + assertEquals(Goodbye, new AMQShortString("Goodbye")); + assertEquals(new AMQShortString("A"), new AMQShortString("A")); + assertFalse(new AMQShortString("A").equals(new AMQShortString("a"))); + } + + +} -- cgit v1.2.1 From 6aa10cc1aeb0ffbc6b02bf662b93eab879c517d7 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 10 Mar 2008 17:16:09 +0000 Subject: QPID-107 : Changes based on code review. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@635602 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/framing/AMQShortString.java | 4 ++-- .../apache/qpid/framing/AMQShortStringTest.java | 26 +++++++++++----------- 2 files changed, 15 insertions(+), 15 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 505c819bb2..665cbf7a84 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -325,7 +325,7 @@ public final class AMQShortString implements CharSequence, Comparable Date: Fri, 14 Mar 2008 10:46:40 +0000 Subject: QPID-592 : Parameterised the Read/Write buffer limits. On the broker extra config [read|write]BufferLimitSize on the client System properties qpid.[read|write].buffer.limit. All the defaults are 256k(262144). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637047 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/mina/SocketIOTest/IOWriterClient.java | 9 +++++---- .../java/org/apache/mina/SocketIOTest/IOWriterServer.java | 11 +++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) (limited to 'java/common/src') diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java index 0b6ed81d18..b93dc46741 100644 --- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java +++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterClient.java @@ -131,19 +131,20 @@ public class IOWriterClient implements Runnable private int _receivedCount = 0; private int _sentCount = 0; + private static final String DEFAULT_READ_BUFFER = "262144"; + private static final String DEFAULT_WRITE_BUFFER = "262144"; public void sessionCreated(IoSession session) throws Exception { IoFilterChain chain = session.getFilterChain(); - int buf_size = ((SocketSessionConfig) session.getConfig()).getSendBufferSize(); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); -// writefilter.setMaximumConnectionBufferCount(1000); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); writefilter.attach(chain); } diff --git a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java index 82ef3d57cc..423e98c67b 100644 --- a/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java +++ b/java/common/src/test/java/org/apache/mina/SocketIOTest/IOWriterServer.java @@ -39,6 +39,9 @@ public class IOWriterServer static public int _PORT = 9999; + private static final String DEFAULT_READ_BUFFER = "262144"; + private static final String DEFAULT_WRITE_BUFFER = "262144"; + private static class TestHandler extends IoHandlerAdapter { @@ -52,14 +55,14 @@ public class IOWriterServer { IoFilterChain chain = ioSession.getFilterChain(); - int buf_size = ((SocketSessionConfig) ioSession.getConfig()).getReceiveBufferSize(); - ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER))); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + + writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.write.buffer.limit", DEFAULT_WRITE_BUFFER))); + writefilter.attach(chain); } -- cgit v1.2.1 From f79decd5edc925aefebac3a5b93c04192a84f9c7 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 18 Mar 2008 12:33:34 +0000 Subject: QPID-847 : Prevented the InvalidArgumentException from closing the connection. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@638346 13f79535-47bb-0310-9956-ffa450edef68 --- java/common/src/main/java/org/apache/qpid/AMQException.java | 5 +++++ .../src/main/java/org/apache/qpid/AMQInvalidArgumentException.java | 6 ++++++ .../src/main/java/org/apache/qpid/AMQUndeliveredException.java | 6 ++++++ 3 files changed, 17 insertions(+) (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/AMQException.java b/java/common/src/main/java/org/apache/qpid/AMQException.java index 41599ed880..00396f6583 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQException.java @@ -98,4 +98,9 @@ public class AMQException extends Exception { return _errorCode; } + + public boolean isHardError() + { + return true; + } } diff --git a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java index 278128f924..6725c1cfe8 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java @@ -36,4 +36,10 @@ public class AMQInvalidArgumentException extends AMQException { super(AMQConstant.INVALID_ARGUMENT, message); } + + public boolean isHardError() + { + return false; + } + } diff --git a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java index 03220cc95e..1e2788f9f5 100644 --- a/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java +++ b/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java @@ -45,4 +45,10 @@ public class AMQUndeliveredException extends AMQException { return _bounced; } + + public boolean isHardError() + { + return false; + } + } -- cgit v1.2.1