From 633c33f224f3196f3f9bd80bd2e418d8143fea06 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 4 May 2012 15:39:19 +0000 Subject: QPID-3858: Updated branch - merged from trunk r.1333987 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68 --- java/common/src/main/java/common.bnd | 2 +- .../org/apache/qpid/codec/MarkableDataInput.java | 20 ++++ .../org/apache/qpid/common/AMQPFilterTypes.java | 4 +- .../org/apache/qpid/framing/AMQShortString.java | 5 + .../apache/qpid/framing/ByteArrayDataInput.java | 20 ++++ .../org/apache/qpid/framing/ContentHeaderBody.java | 11 -- .../org/apache/qpid/framing/EncodingUtils.java | 3 - .../org/apache/qpid/framing/ExtendedDataInput.java | 20 ++++ .../qpid/properties/ConnectionStartProperties.java | 21 ++-- .../java/org/apache/qpid/transport/Connection.java | 1 + .../transport/NetworkTransportConfiguration.java | 4 + .../main/java/org/apache/qpid/transport/Range.java | 5 + .../java/org/apache/qpid/transport/RangeSet.java | 2 + .../org/apache/qpid/transport/RangeSetImpl.java | 62 +++++++++++ .../org/apache/qpid/transport/ServerDelegate.java | 8 +- .../java/org/apache/qpid/transport/Session.java | 16 +-- .../qpid/transport/codec/AbstractEncoder.java | 2 + .../apache/qpid/transport/network/Assembler.java | 2 +- .../transport/network/io/IoNetworkTransport.java | 2 +- .../org/apache/qpid/test/utils/QpidTestCase.java | 18 ++-- .../org/apache/qpid/transport/RangeSetTest.java | 114 +++++++++++++++++++++ 21 files changed, 289 insertions(+), 53 deletions(-) (limited to 'java/common') diff --git a/java/common/src/main/java/common.bnd b/java/common/src/main/java/common.bnd index 64e80c9b43..9149986aa3 100755 --- a/java/common/src/main/java/common.bnd +++ b/java/common/src/main/java/common.bnd @@ -17,7 +17,7 @@ # under the License. # -ver: 0.15.0 +ver: 0.17.0 Bundle-SymbolicName: qpid-common Bundle-Version: ${ver} diff --git a/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java b/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java index 2a243a810d..a1513135a3 100644 --- a/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java +++ b/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java @@ -1,3 +1,23 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.codec; import org.apache.qpid.framing.AMQShortString; diff --git a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 9ed915cc35..57cd2a1ff5 100644 --- a/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -34,11 +34,13 @@ public enum AMQPFilterTypes { JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), - AUTO_CLOSE("x-filter-auto-close"); + AUTO_CLOSE("x-filter-auto-close"), + NO_LOCAL("x-qpid-no-local"); /** The identifying string for the filter type. */ private final AMQShortString _value; + /** * Creates a new filter type from its identifying string. * diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java index 85870e68c5..fdc71e31f9 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java @@ -858,4 +858,9 @@ public final class AMQShortString implements CharSequence, Comparable void add(int value); + void subtract(final RangeSet other); + void clear(); RangeSet copy(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java b/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java index 3e24c10a06..adf18e2920 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java +++ b/java/common/src/main/java/org/apache/qpid/transport/RangeSetImpl.java @@ -150,6 +150,68 @@ public class RangeSetImpl implements RangeSet ranges.clear(); } + public void subtract(final RangeSet other) + { + final Iterator otherIter = other.iterator() ; + if (otherIter.hasNext()) + { + Range otherRange = otherIter.next(); + final ListIterator iter = ranges.listIterator() ; + if (iter.hasNext()) + { + Range range = iter.next(); + do + { + if (otherRange.getUpper() < range.getLower()) + { + otherRange = nextRange(otherIter) ; + } + else if (range.getUpper() < otherRange.getLower()) + { + range = nextRange(iter) ; + } + else + { + final boolean first = range.getLower() < otherRange.getLower() ; + final boolean second = otherRange.getUpper() < range.getUpper() ; + + if (first) + { + iter.set(Range.newInstance(range.getLower(), otherRange.getLower()-1)) ; + if (second) + { + iter.add(Range.newInstance(otherRange.getUpper()+1, range.getUpper())) ; + iter.previous() ; + range = iter.next() ; + } + else + { + range = nextRange(iter) ; + } + } + else if (second) + { + range = Range.newInstance(otherRange.getUpper()+1, range.getUpper()) ; + iter.set(range) ; + otherRange = nextRange(otherIter) ; + } + else + { + iter.remove() ; + range = nextRange(iter) ; + } + } + } + while ((otherRange != null) && (range != null)) ; + } + } + } + + private Range nextRange(final Iterator iter) + { + return (iter.hasNext() ? iter.next() : null) ; + } + public RangeSet copy() { return new org.apache.qpid.transport.RangeSetImpl(this); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index d30e48ad85..ec409d1c72 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -190,18 +190,12 @@ public class ServerDelegate extends ConnectionDelegate @Override public void sessionAttach(Connection conn, SessionAttach atc) - { - sessionAttachImpl(conn, atc); - } - - protected Session sessionAttachImpl(Connection conn, SessionAttach atc) { Session ssn = getSession(conn, atc); conn.map(ssn, atc.getChannel()); + conn.registerSession(ssn); ssn.sessionAttached(atc.getName()); ssn.setState(Session.State.OPEN); - - return ssn; } protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax) diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index d450746eaa..110c73f718 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -161,7 +161,7 @@ public class Session extends SessionInvoker this.expiry = expiry; } - void setClose(boolean close) + protected void setClose(boolean close) { this.closing = close; } @@ -513,20 +513,12 @@ public class Session extends SessionInvoker void knownComplete(RangeSet kc) { - synchronized (processedLock) + if (kc.size() > 0) { - RangeSet newProcessed = RangeSetFactory.createRangeSet(); - for (Range pr : processed) + synchronized (processedLock) { - for (Range kr : kc) - { - for (Range r : pr.subtract(kr)) - { - newProcessed.add(r); - } - } + processed.subtract(kc) ; } - this.processed = newProcessed; } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java index a38c83d4cb..2b93697bfc 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java @@ -25,6 +25,7 @@ import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.Type; +import org.apache.qpid.transport.Xid; import static org.apache.qpid.transport.util.Functions.lsb; import java.io.UnsupportedEncodingException; @@ -61,6 +62,7 @@ abstract class AbstractEncoder implements Encoder ENCODINGS.put(Character.class, Type.CHAR); ENCODINGS.put(byte[].class, Type.VBIN32); ENCODINGS.put(UUID.class, Type.UUID); + ENCODINGS.put(Xid.class, Type.STRUCT32); } private final Map str8cache = new LinkedHashMap() diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index b80d5dfd30..a80b988cea 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -189,7 +189,7 @@ public class Assembler implements Receiver, NetworkDelegate command = Method.create(commandType); command.setSync((0x0001 & hdr) != 0); command.read(dec); - if (command.hasPayload()) + if (command.hasPayload() && !frame.isLastSegment()) { setIncompleteCommand(channel, command); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 387777a80c..42c8334a5d 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -157,7 +157,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNet _factory = factory; _sslContext = sslContext; - InetSocketAddress address = new InetSocketAddress(config.getHost(), config.getPort()); + InetSocketAddress address = config.getAddress(); if(sslContext == null) { diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java index 94db7d8dde..cbf6caf141 100644 --- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java +++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java @@ -42,6 +42,7 @@ public class QpidTestCase extends TestCase { public static final String QPID_HOME = System.getProperty("QPID_HOME"); public static final String TEST_RESOURCES_DIR = QPID_HOME + "/../test-profiles/test_resources/"; + public static final String TMP_FOLDER = System.getProperty("java.io.tmpdir"); private static final Logger _logger = Logger.getLogger(QpidTestCase.class); @@ -108,10 +109,10 @@ public class QpidTestCase extends TestCase _exclusionList = exclusionList; } } - - protected static final String MS_CLASS_NAME_KEY = "messagestore.class.name"; - protected static final String MEMORY_STORE_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStore"; - + + protected static final String MS_FACTORY_CLASS_NAME_KEY = "messagestorefactory.class.name"; + protected static final String MEMORY_STORE_FACTORY_CLASS_NAME = "org.apache.qpid.server.store.MemoryMessageStoreFactory"; + private static List _exclusionList; public QpidTestCase() @@ -139,11 +140,12 @@ public class QpidTestCase extends TestCase } } - public String getTestProfileMessageStoreClassName() + public String getTestProfileMessageStoreFactoryClassName() { - String storeClass = System.getProperty(MS_CLASS_NAME_KEY); + final String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY); + _logger.debug("MS_FACTORY_CLASS_NAME_KEY " + storeFactoryClass); - return storeClass != null ? storeClass : MEMORY_STORE_CLASS_NAME ; + return storeFactoryClass != null ? storeFactoryClass : MEMORY_STORE_FACTORY_CLASS_NAME ; } @@ -157,7 +159,7 @@ public class QpidTestCase extends TestCase * @param fromPort the port to scan for availability * @throws NoSuchElementException if there are no ports available */ - protected int getNextAvailable(int fromPort) + public int getNextAvailable(int fromPort) { if ((fromPort < MIN_PORT_NUMBER) || (fromPort > MAX_PORT_NUMBER)) { diff --git a/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java index 56dbaf5535..14589eb541 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/RangeSetTest.java @@ -27,6 +27,7 @@ import static org.apache.qpid.util.Serial.eq; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; /** @@ -236,4 +237,117 @@ public class RangeSetTest extends TestCase assertEquals(d.getUpper(), 10); } + public void testSetSubtract1() + { + final RangeSet orig = createRangeSet(0, 10) ; + final RangeSet update = createRangeSet(3, 15) ; + orig.subtract(update) ; + checkRange(orig, 0, 2) ; + } + + public void testSetSubtract2() + { + final RangeSet orig = createRangeSet(0, 10) ; + final RangeSet update = createRangeSet(3, 10) ; + orig.subtract(update) ; + checkRange(orig, 0, 2) ; + } + + public void testSetSubtract3() + { + final RangeSet orig = createRangeSet(0, 10) ; + final RangeSet update = createRangeSet(3, 4) ; + orig.subtract(update) ; + checkRange(orig, 0, 2, 5, 10) ; + } + + public void testSetSubtract4() + { + final RangeSet orig = createRangeSet(3, 15) ; + final RangeSet update = createRangeSet(0, 10) ; + orig.subtract(update) ; + checkRange(orig, 11, 15) ; + } + + public void testSetSubtract5() + { + final RangeSet orig = createRangeSet(3, 10) ; + final RangeSet update = createRangeSet(0, 10) ; + orig.subtract(update) ; + checkRange(orig) ; + } + + public void testSetSubtract6() + { + final RangeSet orig = createRangeSet(3, 10) ; + final RangeSet update = createRangeSet(0, 15) ; + orig.subtract(update) ; + checkRange(orig) ; + } + + public void testSetSubtract7() + { + final RangeSet orig = createRangeSet(0, 10) ; + final RangeSet update = createRangeSet(0, 15) ; + orig.subtract(update) ; + checkRange(orig) ; + } + + public void testSetSubtract8() + { + final RangeSet orig = createRangeSet(0, 15) ; + final RangeSet update = createRangeSet(0, 10) ; + orig.subtract(update) ; + checkRange(orig, 11, 15) ; + } + + public void testSetSubtract9() + { + final RangeSet orig = createRangeSet(0, 15, 20, 30) ; + final RangeSet update = createRangeSet(2, 3, 5, 6, 8, 9, 22, 23, 27, 28) ; + orig.subtract(update) ; + checkRange(orig, 0, 1, 4, 4, 7, 7, 10, 15, 20, 21, 24, 26, 29, 30) ; + } + + public void testSetSubtract10() + { + final RangeSet orig = createRangeSet(0, 15, 20, 30) ; + final RangeSet update = createRangeSet(0, 2, 4, 6, 10, 22, 24, 24, 27, 30) ; + orig.subtract(update) ; + checkRange(orig, 3, 3, 7, 9, 23, 23, 25, 26) ; + } + + public void testSetSubtract11() + { + final RangeSet orig = createRangeSet(0, 2, 4, 6, 10, 22, 24, 24, 27, 30) ; + final RangeSet update = createRangeSet(0, 2, 4, 6, 10, 22, 24, 24, 27, 30) ; + orig.subtract(update) ; + checkRange(orig) ; + } + + private RangeSet createRangeSet(int ... bounds) + { + RangeSet set = RangeSetFactory.createRangeSet(); + final int length = (bounds == null ? 0 : bounds.length) ; + int count = 0 ; + while(count < length) + { + set.add(bounds[count++], bounds[count++]) ; + } + return set ; + } + + private void checkRange(final RangeSet rangeSet, int ... bounds) + { + final int length = (bounds == null ? 0 : bounds.length) ; + assertEquals("Range count", length/2, rangeSet.size()) ; + final Iterator iter = rangeSet.iterator() ; + int count = 0 ; + while(count < length) + { + final Range range = iter.next() ; + assertEquals("Range lower", bounds[count++], range.getLower()) ; + assertEquals("Range upper", bounds[count++], range.getUpper()) ; + } + } } -- cgit v1.2.1