summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-19 13:51:04 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-19 13:51:04 +0000
commit5b5e8fc7d7ed98f91e57db5c408d91796302da45 (patch)
tree9d3d482a9705af66470fb31eb100e72f857165e0 /qpid/java
parent2790ce755170f85afc66183e25f8165ef9c1dd0e (diff)
downloadqpid-python-qpid-2935.tar.gz
QPID-2935: merge latest trunkqpid-2935
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1072330 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java19
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java20
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java25
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java65
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java57
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java141
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java16
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java35
15 files changed, 190 insertions, 247 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index c0afae0773..2c8fd737c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -712,6 +712,25 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
+ final String type,
+ final String name,
+ final Map properties,
+ final java.lang.Boolean lenient)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.DeleteMethodResponseCommand delete(final BrokerSchema.BrokerClass.DeleteMethodResponseCommandFactory factory,
+ final String type,
+ final String name,
+ final Map options)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public UUID getId()
{
return _obj.getId();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
index 3d31a705fe..2ebbfeb734 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
@@ -21,9 +21,6 @@
package org.apache.qpid.server.logging.actors;
import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.RootMessageLogger;
import java.util.EmptyStackException;
import java.util.Stack;
@@ -72,7 +69,7 @@ public class CurrentActor
private static LogActor _defaultActor;
/**
- * Set a new LogActor to be the Current Actor
+ * Set a new {@link LogActor} to be the Current Actor
* <p/>
* This pushes the Actor in to the LIFO Queue
*
@@ -85,7 +82,16 @@ public class CurrentActor
}
/**
- * Remove the current LogActor.
+ * Remove all {@link LogActor}s
+ */
+ public static void removeAll()
+ {
+ Stack<LogActor> stack = _currentActor.get();
+ stack.clear();
+ }
+
+ /**
+ * Remove the current {@link LogActor}.
* <p/>
* Calling remove without calling set will result in an EmptyStackException.
*/
@@ -96,9 +102,7 @@ public class CurrentActor
}
/**
- * Return the current head of the list of LogActors.
- * <p/>
- * If there has been no set call then this will return Null.
+ * Return the current head of the list of {@link LogActor}s.
*
* @return Current LogActor
*/
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 595822173f..925b161118 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -149,10 +149,7 @@ public class InternalBrokerBaseCase extends QpidTestCase
{
super.tearDown();
// Purge Any erroneously added actors
- while (CurrentActor.get() != null)
- {
- CurrentActor.remove();
- }
+ CurrentActor.removeAll();
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 4b4417b6ef..b0bd8f8e97 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
- _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size()));
+ _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
for (AMQSession s : sessions)
{
- ((AMQSession_0_10) s)._qpidConnection = _qpidConnection;
s.resubscribe();
}
}
-
public void closeConnection(long timeout) throws JMSException, AMQException
{
try
@@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
ConnectionClose close = exc.getClose();
if (close == null)
{
+ _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
+
try
{
if (_conn.firePreFailover(false) && _conn.attemptReconnection())
{
_conn.failoverPrep();
- _qpidConnection.resume();
+ _conn.resubscribeSessions();
_conn.fireFailoverComplete();
return;
}
@@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
_logger.error("error during failover", e);
}
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
+ }
}
ExceptionListener listener = _conn._exceptionListener;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 1eaccf53fc..517a7a5ce8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void resumed(Session ssn)
{
_qpidConnection = ssn.getConnection();
- try
- {
- resubscribe();
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
}
public void message(Session ssn, MessageTransfer xfr)
@@ -942,6 +934,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected Long requestQueueDepth(AMQDestination amqd)
{
+ flushAcknowledgments();
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index ed2e96e83b..92e61984d2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client.message;
+import java.lang.ref.SoftReference;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -38,7 +39,6 @@ import javax.jms.Session;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.collections.ReferenceMap;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_10;
@@ -61,7 +61,7 @@ import org.apache.qpid.transport.ReplyTo;
*/
public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
- private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+ private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
public static final String JMS_TYPE = "x-jms-type";
@@ -229,22 +229,19 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
else
{
- Destination dest = _destinationCache.get(replyTo);
+ Destination dest = null;
+ SoftReference<Destination> ref = _destinationCache.get(replyTo);
+ if (ref != null)
+ {
+ dest = ref.get();
+ }
if (dest == null)
{
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- dest = generateDestination(exchange == null ? new AMQShortString("") :
- new AMQShortString(exchange),
- routingKey == null ? new AMQShortString(""):
- new AMQShortString(routingKey));
-
-
-
-
-
- _destinationCache.put(replyTo, dest);
+ dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ _destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
return dest;
@@ -291,7 +288,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
- _destinationCache.put(replyTo, destination);
+ _destinationCache.put(replyTo, new SoftReference<Destination>(destination));
_messageProps.setReplyTo(replyTo);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
index 4e97855a6f..491a7ac218 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java
@@ -142,4 +142,13 @@ public final class Binary
return str(ByteBuffer.wrap(bytes, offset, size));
}
+ public boolean hasExcessCapacity()
+ {
+ return size != bytes.length;
+ }
+
+ public Binary copy()
+ {
+ return new Binary(getBytes());
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
index b8e7616a37..f21df251da 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
@@ -35,9 +35,7 @@ import org.slf4j.LoggerFactory;
/**
* ServerDelegate
- *
*/
-
public class ServerDelegate extends ConnectionDelegate
{
protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class);
@@ -140,12 +138,12 @@ public class ServerDelegate extends ConnectionDelegate
protected int getHeartbeatMax()
{
- return Integer.MAX_VALUE;
+ return 0xFFFF;
}
protected int getChannelMax()
{
- return Integer.MAX_VALUE;
+ return 0xFFFF;
}
@Override
@@ -202,5 +200,4 @@ public class ServerDelegate extends ConnectionDelegate
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
index a8a4997ae7..09ce6a7eb1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java
@@ -143,10 +143,18 @@ abstract class AbstractDecoder implements Decoder
short size = readUint8();
Binary bin = get(size);
String str = str8cache.get(bin);
+
if (str == null)
{
str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8");
- str8cache.put(bin, str);
+ if(bin.hasExcessCapacity())
+ {
+ str8cache.put(bin.copy(), str);
+ }
+ else
+ {
+ str8cache.put(bin, str);
+ }
}
return str;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
index 6f7a2fa3b2..10f67e1cd6 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
@@ -41,6 +41,11 @@ public final class BBDecoder extends AbstractDecoder
this.in.order(ByteOrder.BIG_ENDIAN);
}
+ public void releaseBuffer()
+ {
+ in = null;
+ }
+
protected byte doGet()
{
return in.get();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 357caa26e1..1a85ab88a5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -20,38 +20,36 @@
*/
package org.apache.qpid.transport.network;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.transport.codec.BBDecoder;
-import org.apache.qpid.transport.codec.Decoder;
-
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Struct;
-
+import org.apache.qpid.transport.codec.BBDecoder;
/**
* Assembler
*
*/
-
public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
+ // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge
+ // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1.
+ private static final int ARRAY_SIZE = 0xFF;
+ private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+ private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>();
private final Receiver<ProtocolEvent> receiver;
private final Map<Integer,List<Frame>> segments;
- private final Method[] incomplete;
- private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
+ private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>()
{
public BBDecoder initialValue()
{
@@ -63,7 +61,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
{
this.receiver = receiver;
segments = new HashMap<Integer,List<Frame>>();
- incomplete = new Method[64*1024];
}
private int segmentKey(Frame frame)
@@ -169,7 +166,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
private void assemble(Frame frame, ByteBuffer segment)
{
- BBDecoder dec = decoder.get();
+ BBDecoder dec = _decoder.get();
dec.init(segment);
int channel = frame.getChannel();
@@ -192,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.read(dec);
if (command.hasPayload())
{
- incomplete[channel] = command;
+ setIncompleteCommand(channel, command);
}
else
{
@@ -200,8 +197,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
}
break;
case HEADER:
- command = incomplete[channel];
- List<Struct> structs = new ArrayList(2);
+ command = getIncompleteCommand(channel);
+ List<Struct> structs = new ArrayList<Struct>(2);
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
@@ -209,19 +206,51 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate
command.setHeader(new Header(structs));
if (frame.isLastSegment())
{
- incomplete[channel] = null;
+ setIncompleteCommand(channel, null);
emit(channel, command);
}
break;
case BODY:
- command = incomplete[channel];
+ command = getIncompleteCommand(channel);
command.setBody(segment);
- incomplete[channel] = null;
+ setIncompleteCommand(channel, null);
emit(channel, command);
break;
default:
throw new IllegalStateException("unknown frame type: " + frame.getType());
}
+
+ dec.releaseBuffer();
}
+ private void setIncompleteCommand(int channelId, Method incomplete)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ _incompleteMethodArray[channelId] = incomplete;
+ }
+ else
+ {
+ if(incomplete != null)
+ {
+ _incompleteMethodMap.put(channelId, incomplete);
+ }
+ else
+ {
+ _incompleteMethodMap.remove(channelId);
+ }
+ }
+ }
+
+ private Method getIncompleteCommand(int channelId)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ return _incompleteMethodArray[channelId];
+ }
+ else
+ {
+ return _incompleteMethodMap.get(channelId);
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index ab174b00b3..685034d1a9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -40,21 +40,15 @@ import static java.lang.Math.min;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-
/**
* Disassembler
- *
*/
-
-public final class Disassembler implements Sender<ProtocolEvent>,
- ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>
{
-
private final Sender<ByteBuffer> sender;
private final int maxPayload;
- private final ByteBuffer header;
private final Object sendlock = new Object();
- private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>()
+ private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
{
public BBEncoder initialValue()
{
@@ -66,14 +60,10 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
{
- throw new IllegalArgumentException
- ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
}
this.sender = sender;
this.maxPayload = maxFrame - HEADER_SIZE;
- this.header = ByteBuffer.allocate(HEADER_SIZE);
- this.header.order(ByteOrder.BIG_ENDIAN);
-
}
public void send(ProtocolEvent event)
@@ -101,25 +91,27 @@ public final class Disassembler implements Sender<ProtocolEvent>,
{
synchronized (sendlock)
{
- header.put(0, flags);
- header.put(1, type);
- header.putShort(2, (short) (size + HEADER_SIZE));
- header.put(5, track);
- header.putShort(6, (short) channel);
-
- header.rewind();
-
- sender.send(header);
+ ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE);
+ data.order(ByteOrder.BIG_ENDIAN);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+ data.position(HEADER_SIZE);
int limit = buf.limit();
buf.limit(buf.position() + size);
- sender.send(buf);
+ data.put(buf);
buf.limit(limit);
+
+ data.rewind();
+ sender.send(data);
}
}
- private void fragment(byte flags, SegmentType type, ProtocolEvent event,
- ByteBuffer buf)
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf)
{
byte typeb = (byte) type.getValue();
byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
@@ -170,17 +162,9 @@ public final class Disassembler implements Sender<ProtocolEvent>,
method(method, SegmentType.COMMAND);
}
- private ByteBuffer copy(ByteBuffer src)
- {
- ByteBuffer buf = ByteBuffer.allocate(src.remaining());
- buf.put(src);
- buf.flip();
- return buf;
- }
-
private void method(Method method, SegmentType type)
{
- BBEncoder enc = encoder.get();
+ BBEncoder enc = _encoder.get();
enc.init();
enc.writeUint16(method.getEncodedType());
if (type == SegmentType.COMMAND)
@@ -227,8 +211,7 @@ public final class Disassembler implements Sender<ProtocolEvent>,
if (payload)
{
ByteBuffer body = method.getBody();
- fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER,
- method, headerSeg);
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerSeg);
if (body != null)
{
fragment(LAST_SEG, SegmentType.BODY, method, body);
@@ -240,7 +223,7 @@ public final class Disassembler implements Sender<ProtocolEvent>,
public void error(Void v, ProtocolError error)
{
- throw new IllegalArgumentException("" + error);
+ throw new IllegalArgumentException(String.valueOf(error));
}
public void setIdleTimeout(int i)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
index 6211dd8e70..74f50e8659 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
@@ -21,30 +21,18 @@
package org.apache.qpid.server.queue;
-import junit.framework.TestCase;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.spi.InitialContextFactory;
-import java.util.Hashtable;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
/**
* Test Case to ensure that messages are correctly returned.
@@ -52,19 +40,12 @@ import java.util.Hashtable;
* - The message is returned.
* - The broker doesn't leak memory.
* - The broker's state is correct after test.
- *
- * Why is this hardcoded to InVM testing, should be converted to QTC.
*/
-public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
+public class QueueDepthWithSelectorTest extends QpidBrokerTestCase
{
- protected static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class);
-
- protected final String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;
protected final String VHOST = "test";
protected final String QUEUE = this.getClass().getName();
- protected Context _context;
-
protected Connection _clientConnection;
protected Connection _producerConnection;
private Session _clientSession;
@@ -82,47 +63,21 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
public void setUp() throws Exception
{
super.setUp();
- TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
-
- System.err.println("amqj.logging.level:" + System.getProperty("amqj.logging.level"));
- System.err.println("_logger.level:" + _logger.getLevel());
- System.err.println("_logger.isE-Error:" + _logger.isEnabledFor(Level.ERROR));
- System.err.println("_logger.isE-Warn:" + _logger.isEnabledFor(Level.WARN));
- System.err.println("_logger.isInfo:" + _logger.isInfoEnabled() + ":" + _logger.isEnabledFor(Level.INFO));
- System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" + _logger.isEnabledFor(Level.DEBUG));
- System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" + _logger.isEnabledFor(Level.TRACE));
-
- System.err.println(Logger.getRootLogger().getLoggerRepository());
-
- InitialContextFactory factory = new PropertiesFileInitialContextFactory();
-
- Hashtable<String, String> env = new Hashtable<String, String>();
-
- env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
-
- _context = factory.getInitialContext(env);
_messages = new Message[MSG_COUNT];
- _queue = (Queue) _context.lookup("queue");
- init();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- if (_producerConnection != null)
- {
- _producerConnection.close();
- }
-
- if (_clientConnection != null)
- {
- _clientConnection.close();
- }
+ _queue = getTestQueue();
+
+ //Create Producer
+ _producerConnection = getConnection();
+ _producerConnection.start();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _producer = _producerSession.createProducer(_queue);
- TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE);
- super.tearDown();
+ // Create consumer
+ _clientConnection = getConnection();
+ _clientConnection.start();
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumer = _clientSession.createConsumer(_queue, "key = 23");
}
public void test() throws Exception
@@ -139,7 +94,8 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
//Verify we get all the messages.
_logger.info("Verifying messages");
- verifyAllMessagesRecevied(0);
+ verifyAllMessagesRecevied(50);
+ verifyBrokerState(0);
//Close the connection.. .giving the broker time to clean up its state.
_clientConnection.close();
@@ -149,39 +105,18 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
verifyBrokerState(0);
}
- protected void init() throws NamingException, JMSException, AMQException
- {
- //Create Producer
- _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
- _producerConnection.start();
- _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _producer = _producerSession.createProducer(_queue);
-
- // Create consumer
- _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
- _clientConnection.start();
- _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer = _clientSession.createConsumer(_queue, "key = 23");
- }
-
protected void verifyBrokerState(int expectedDepth)
{
try
{
- _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
-
- _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
+ Connection connection = getConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try
- {
Thread.sleep(2000);
- long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+ long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue);
assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
+
+ connection.close();
}
catch (InterruptedException e)
{
@@ -191,34 +126,22 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
{
fail(e.getMessage());
}
- finally
+ catch (Exception e)
{
- try
- {
- _clientConnection.close();
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
+ fail(e.getMessage());
}
-
}
protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception
{
-
boolean[] msgIdRecevied = new boolean[MSG_COUNT];
- for (int i = 0; i < MSG_COUNT; i++)
+ for (int i = 0; i < expectedDepth; i++)
{
_messages[i] = _consumer.receive(1000);
assertNotNull("should have received a message but didn't", _messages[i]);
}
-
- long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
- assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth);
-
+
//Check received messages
int msgId = 0;
for (Message msg : _messages)
@@ -231,7 +154,7 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
}
//Check all received
- for (msgId = 0; msgId < MSG_COUNT; msgId++)
+ for (msgId = 0; msgId < expectedDepth; msgId++)
{
assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]);
}
@@ -241,9 +164,6 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
* Get the next message putting the given count into the intProperties as ID.
*
* @param msgNo the message count to store as ID.
- *
- * @return
- *
* @throws JMSException
*/
protected Message nextMessage(int msgNo) throws JMSException
@@ -253,5 +173,4 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase
send.setIntProperty("key", 23);
return send;
}
-
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
index f54b022c09..97d825177c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.test.client;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.FailoverBaseCase;
@@ -41,8 +41,6 @@ import java.util.Random;
public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
- private static final Logger _logger = Logger.getLogger(QueueBrowserAutoAckTest.class);
-
protected Connection _clientConnection;
protected Session _clientSession;
protected Queue _queue;
@@ -53,10 +51,8 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
super.setUp();
-
//Create Client
_clientConnection = getConnection();
-
_clientConnection.start();
setupSession();
@@ -395,7 +391,6 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
closeBrowserBeforeAfterGetNext(10);
validate(messages);
-
}
/**
@@ -454,19 +449,15 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
int messages = 5;
-
sendMessages("connection1", messages);
if (!CLUSTERED)
{
sendMessages("connection2", messages);
}
-
checkQueueDepth(messages);
-
_logger.info("Creating Queue Browser");
-
QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);
long queueDepth = 0;
@@ -477,19 +468,17 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
}
catch (AMQException e)
{
+ fail("Caught exception getting queue depth: " + e.getMessage());
}
assertEquals("Session reports Queue depth not as expected", messages, queueDepth);
-
int msgCount = 0;
-
int failPoint = 0;
failPoint = new Random().nextInt(messages) + 1;
Enumeration msgs = queueBrowser.getEnumeration();
-
while (msgs.hasMoreElements())
{
msgs.nextElement();
@@ -536,5 +525,4 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase
//Validate all messages still on Broker 1
validate(messages);
}
-
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java
index 81089a4dfc..fe929b4965 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java
@@ -34,9 +34,8 @@ import java.io.*;
/**
- * @author Apache Software Foundation
- * This test makes sure that utf8 characters can be used for
- * specifying exchange, queue name and routing key.
+ * This test makes sure that utf8 characters can be used for
+ * specifying exchange, queue name and routing key.
*
* those tests are related to qpid-1384
*/
@@ -44,7 +43,6 @@ public class UTF8Test extends QpidBrokerTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(UTF8Test.class);
-
public void testPlainEn() throws Exception
{
invoke("UTF8En");
@@ -56,7 +54,6 @@ public class UTF8Test extends QpidBrokerTestCase
invoke("UTF8Jp");
}
-
private void invoke(String name) throws Exception
{
String path = System.getProperties().getProperty("QPID_HOME");
@@ -65,6 +62,7 @@ public class UTF8Test extends QpidBrokerTestCase
runTest(in.readLine(), in.readLine(), in.readLine(), in.readLine());
in.close();
}
+
private void runTest(String exchangeName, String queueName, String routingKey, String data) throws Exception
{
_logger.info("Running test for exchange: " + exchangeName
@@ -89,26 +87,17 @@ public class UTF8Test extends QpidBrokerTestCase
private void declareQueue(String exch, String routkey, String qname) throws Exception
{
- Connection conn = new Connection();
- if (!_broker.equals(QpidBrokerTestCase.EXTERNAL) && !isBroker08())
- {
- conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false);
- }
- else
- {
- throw new Exception("unsupported test " +
- "configuration. broker: " + _broker + " version > 0.10 "+ !isBroker08() + " This test must be run on a local broker using protocol 0.10 or higher.");
- }
- Session sess = conn.createSession(0);
- sess.exchangeDeclare(exch, "direct", null, null);
- sess.queueDeclare(qname, null, null);
- sess.exchangeBind(qname, exch, routkey, null);
- sess.sync();
- conn.close();
+ Connection conn = new Connection();
+ conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false);
+ Session sess = conn.createSession(0);
+ sess.exchangeDeclare(exch, "direct", null, null);
+ sess.queueDeclare(qname, null, null);
+ sess.exchangeBind(qname, exch, routkey, null);
+ sess.sync();
+ conn.close();
}
- private Destination getDestination(String exch, String routkey, String qname)
- throws Exception
+ private Destination getDestination(String exch, String routkey, String qname) throws Exception
{
Properties props = new Properties();
props.setProperty("destination.directUTF8Queue",