diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 13:51:04 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 13:51:04 +0000 |
| commit | 5b5e8fc7d7ed98f91e57db5c408d91796302da45 (patch) | |
| tree | 9d3d482a9705af66470fb31eb100e72f857165e0 /qpid/java | |
| parent | 2790ce755170f85afc66183e25f8165ef9c1dd0e (diff) | |
| download | qpid-python-qpid-2935.tar.gz | |
QPID-2935: merge latest trunkqpid-2935
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1072330 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
15 files changed, 190 insertions, 247 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index c0afae0773..2c8fd737c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -712,6 +712,25 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); } + public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory, + final String type, + final String name, + final Map properties, + final java.lang.Boolean lenient) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + + public BrokerSchema.BrokerClass.DeleteMethodResponseCommand delete(final BrokerSchema.BrokerClass.DeleteMethodResponseCommandFactory factory, + final String type, + final String name, + final Map options) + { + //TODO: + return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED); + } + public UUID getId() { return _obj.getId(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java index 3d31a705fe..2ebbfeb734 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java @@ -21,9 +21,6 @@ package org.apache.qpid.server.logging.actors; import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.LogMessage; -import org.apache.qpid.server.logging.RootMessageLogger; import java.util.EmptyStackException; import java.util.Stack; @@ -72,7 +69,7 @@ public class CurrentActor private static LogActor _defaultActor; /** - * Set a new LogActor to be the Current Actor + * Set a new {@link LogActor} to be the Current Actor * <p/> * This pushes the Actor in to the LIFO Queue * @@ -85,7 +82,16 @@ public class CurrentActor } /** - * Remove the current LogActor. + * Remove all {@link LogActor}s + */ + public static void removeAll() + { + Stack<LogActor> stack = _currentActor.get(); + stack.clear(); + } + + /** + * Remove the current {@link LogActor}. * <p/> * Calling remove without calling set will result in an EmptyStackException. */ @@ -96,9 +102,7 @@ public class CurrentActor } /** - * Return the current head of the list of LogActors. - * <p/> - * If there has been no set call then this will return Null. + * Return the current head of the list of {@link LogActor}s. * * @return Current LogActor */ diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 595822173f..925b161118 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -149,10 +149,7 @@ public class InternalBrokerBaseCase extends QpidTestCase { super.tearDown(); // Purge Any erroneously added actors - while (CurrentActor.get() != null) - { - CurrentActor.remove(); - } + CurrentActor.removeAll(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 4b4417b6ef..b0bd8f8e97 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import javax.jms.ExceptionListener; import javax.jms.JMSException; @@ -211,15 +212,13 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public void resubscribeSessions() throws JMSException, AMQException, FailoverException { List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values()); - _logger.info(String.format("Resubscribing sessions = %s sessions.size=%s", sessions, sessions.size())); + _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size())); for (AMQSession s : sessions) { - ((AMQSession_0_10) s)._qpidConnection = _qpidConnection; s.resubscribe(); } } - public void closeConnection(long timeout) throws JMSException, AMQException { try @@ -257,12 +256,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec ConnectionClose close = exc.getClose(); if (close == null) { + _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); + try { if (_conn.firePreFailover(false) && _conn.attemptReconnection()) { _conn.failoverPrep(); - _qpidConnection.resume(); + _conn.resubscribeSessions(); _conn.fireFailoverComplete(); return; } @@ -271,6 +272,11 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { _logger.error("error during failover", e); } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); + } } ExceptionListener listener = _conn._exceptionListener; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 1eaccf53fc..517a7a5ce8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -892,14 +892,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void resumed(Session ssn) { _qpidConnection = ssn.getConnection(); - try - { - resubscribe(); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } } public void message(Session ssn, MessageTransfer xfr) @@ -942,6 +934,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic protected Long requestQueueDepth(AMQDestination amqd) { + flushAcknowledgments(); return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index ed2e96e83b..92e61984d2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -21,6 +21,7 @@ package org.apache.qpid.client.message; +import java.lang.ref.SoftReference; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; @@ -38,7 +39,6 @@ import javax.jms.Session; import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; -import org.apache.qpid.collections.ReferenceMap; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQSession_0_10; @@ -61,7 +61,7 @@ import org.apache.qpid.transport.ReplyTo; */ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { - private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap()); + private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>()); public static final String JMS_TYPE = "x-jms-type"; @@ -229,22 +229,19 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } else { - Destination dest = _destinationCache.get(replyTo); + Destination dest = null; + SoftReference<Destination> ref = _destinationCache.get(replyTo); + if (ref != null) + { + dest = ref.get(); + } if (dest == null) { String exchange = replyTo.getExchange(); String routingKey = replyTo.getRoutingKey(); - dest = generateDestination(exchange == null ? new AMQShortString("") : - new AMQShortString(exchange), - routingKey == null ? new AMQShortString(""): - new AMQShortString(routingKey)); - - - - - - _destinationCache.put(replyTo, dest); + dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey)); + _destinationCache.put(replyTo, new SoftReference<Destination>(dest)); } return dest; @@ -291,7 +288,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate } final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); - _destinationCache.put(replyTo, destination); + _destinationCache.put(replyTo, new SoftReference<Destination>(destination)); _messageProps.setReplyTo(replyTo); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java index 4e97855a6f..491a7ac218 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Binary.java @@ -142,4 +142,13 @@ public final class Binary return str(ByteBuffer.wrap(bytes, offset, size)); } + public boolean hasExcessCapacity() + { + return size != bytes.length; + } + + public Binary copy() + { + return new Binary(getBytes()); + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java index b8e7616a37..f21df251da 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java @@ -35,9 +35,7 @@ import org.slf4j.LoggerFactory; /** * ServerDelegate - * */ - public class ServerDelegate extends ConnectionDelegate { protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class); @@ -140,12 +138,12 @@ public class ServerDelegate extends ConnectionDelegate protected int getHeartbeatMax() { - return Integer.MAX_VALUE; + return 0xFFFF; } protected int getChannelMax() { - return Integer.MAX_VALUE; + return 0xFFFF; } @Override @@ -202,5 +200,4 @@ public class ServerDelegate extends ConnectionDelegate ssn.sessionAttached(atc.getName()); ssn.setState(Session.State.OPEN); } - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java index a8a4997ae7..09ce6a7eb1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractDecoder.java @@ -143,10 +143,18 @@ abstract class AbstractDecoder implements Decoder short size = readUint8(); Binary bin = get(size); String str = str8cache.get(bin); + if (str == null) { str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8"); - str8cache.put(bin, str); + if(bin.hasExcessCapacity()) + { + str8cache.put(bin.copy(), str); + } + else + { + str8cache.put(bin, str); + } } return str; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java index 6f7a2fa3b2..10f67e1cd6 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java @@ -41,6 +41,11 @@ public final class BBDecoder extends AbstractDecoder this.in.order(ByteOrder.BIG_ENDIAN); } + public void releaseBuffer() + { + in = null; + } + protected byte doGet() { return in.get(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 357caa26e1..1a85ab88a5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -20,38 +20,36 @@ */ package org.apache.qpid.transport.network; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.ByteBuffer; - -import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.transport.codec.Decoder; - import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Struct; - +import org.apache.qpid.transport.codec.BBDecoder; /** * Assembler * */ - public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { + // Use a small array to store incomplete Methods for low-value channels, instead of allocating a huge + // array or always boxing the channelId and looking it up in the map. This value must be of the form 2^X - 1. + private static final int ARRAY_SIZE = 0xFF; + private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1]; + private final Map<Integer, Method> _incompleteMethodMap = new HashMap<Integer, Method>(); private final Receiver<ProtocolEvent> receiver; private final Map<Integer,List<Frame>> segments; - private final Method[] incomplete; - private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>() + private static final ThreadLocal<BBDecoder> _decoder = new ThreadLocal<BBDecoder>() { public BBDecoder initialValue() { @@ -63,7 +61,6 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { this.receiver = receiver; segments = new HashMap<Integer,List<Frame>>(); - incomplete = new Method[64*1024]; } private int segmentKey(Frame frame) @@ -169,7 +166,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate private void assemble(Frame frame, ByteBuffer segment) { - BBDecoder dec = decoder.get(); + BBDecoder dec = _decoder.get(); dec.init(segment); int channel = frame.getChannel(); @@ -192,7 +189,7 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.read(dec); if (command.hasPayload()) { - incomplete[channel] = command; + setIncompleteCommand(channel, command); } else { @@ -200,8 +197,8 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate } break; case HEADER: - command = incomplete[channel]; - List<Struct> structs = new ArrayList(2); + command = getIncompleteCommand(channel); + List<Struct> structs = new ArrayList<Struct>(2); while (dec.hasRemaining()) { structs.add(dec.readStruct32()); @@ -209,19 +206,51 @@ public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate command.setHeader(new Header(structs)); if (frame.isLastSegment()) { - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); } break; case BODY: - command = incomplete[channel]; + command = getIncompleteCommand(channel); command.setBody(segment); - incomplete[channel] = null; + setIncompleteCommand(channel, null); emit(channel, command); break; default: throw new IllegalStateException("unknown frame type: " + frame.getType()); } + + dec.releaseBuffer(); } + private void setIncompleteCommand(int channelId, Method incomplete) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + _incompleteMethodArray[channelId] = incomplete; + } + else + { + if(incomplete != null) + { + _incompleteMethodMap.put(channelId, incomplete); + } + else + { + _incompleteMethodMap.remove(channelId); + } + } + } + + private Method getIncompleteCommand(int channelId) + { + if ((channelId & ARRAY_SIZE) == channelId) + { + return _incompleteMethodArray[channelId]; + } + else + { + return _incompleteMethodMap.get(channelId); + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index ab174b00b3..685034d1a9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -40,21 +40,15 @@ import static java.lang.Math.min; import java.nio.ByteBuffer; import java.nio.ByteOrder; - /** * Disassembler - * */ - -public final class Disassembler implements Sender<ProtocolEvent>, - ProtocolDelegate<Void> +public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void> { - private final Sender<ByteBuffer> sender; private final int maxPayload; - private final ByteBuffer header; private final Object sendlock = new Object(); - private final ThreadLocal<BBEncoder> encoder = new ThreadLocal<BBEncoder>() + private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>() { public BBEncoder initialValue() { @@ -66,14 +60,10 @@ public final class Disassembler implements Sender<ProtocolEvent>, { if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024) { - throw new IllegalArgumentException - ("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); + throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame); } this.sender = sender; this.maxPayload = maxFrame - HEADER_SIZE; - this.header = ByteBuffer.allocate(HEADER_SIZE); - this.header.order(ByteOrder.BIG_ENDIAN); - } public void send(ProtocolEvent event) @@ -101,25 +91,27 @@ public final class Disassembler implements Sender<ProtocolEvent>, { synchronized (sendlock) { - header.put(0, flags); - header.put(1, type); - header.putShort(2, (short) (size + HEADER_SIZE)); - header.put(5, track); - header.putShort(6, (short) channel); - - header.rewind(); - - sender.send(header); + ByteBuffer data = ByteBuffer.allocate(size + HEADER_SIZE); + data.order(ByteOrder.BIG_ENDIAN); + + data.put(0, flags); + data.put(1, type); + data.putShort(2, (short) (size + HEADER_SIZE)); + data.put(5, track); + data.putShort(6, (short) channel); + data.position(HEADER_SIZE); int limit = buf.limit(); buf.limit(buf.position() + size); - sender.send(buf); + data.put(buf); buf.limit(limit); + + data.rewind(); + sender.send(data); } } - private void fragment(byte flags, SegmentType type, ProtocolEvent event, - ByteBuffer buf) + private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buf) { byte typeb = (byte) type.getValue(); byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0; @@ -170,17 +162,9 @@ public final class Disassembler implements Sender<ProtocolEvent>, method(method, SegmentType.COMMAND); } - private ByteBuffer copy(ByteBuffer src) - { - ByteBuffer buf = ByteBuffer.allocate(src.remaining()); - buf.put(src); - buf.flip(); - return buf; - } - private void method(Method method, SegmentType type) { - BBEncoder enc = encoder.get(); + BBEncoder enc = _encoder.get(); enc.init(); enc.writeUint16(method.getEncodedType()); if (type == SegmentType.COMMAND) @@ -227,8 +211,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, if (payload) { ByteBuffer body = method.getBody(); - fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, - method, headerSeg); + fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerSeg); if (body != null) { fragment(LAST_SEG, SegmentType.BODY, method, body); @@ -240,7 +223,7 @@ public final class Disassembler implements Sender<ProtocolEvent>, public void error(Void v, ProtocolError error) { - throw new IllegalArgumentException("" + error); + throw new IllegalArgumentException(String.valueOf(error)); } public void setIdleTimeout(int i) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java index 6211dd8e70..74f50e8659 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java @@ -21,30 +21,18 @@ package org.apache.qpid.server.queue; -import junit.framework.TestCase; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.naming.spi.InitialContextFactory; -import java.util.Hashtable; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.utils.QpidBrokerTestCase; /** * Test Case to ensure that messages are correctly returned. @@ -52,19 +40,12 @@ import java.util.Hashtable; * - The message is returned. * - The broker doesn't leak memory. * - The broker's state is correct after test. - * - * Why is this hardcoded to InVM testing, should be converted to QTC. */ -public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase +public class QueueDepthWithSelectorTest extends QpidBrokerTestCase { - protected static final Logger _logger = Logger.getLogger(QueueDepthWithSelectorTest.class); - - protected final String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE; protected final String VHOST = "test"; protected final String QUEUE = this.getClass().getName(); - protected Context _context; - protected Connection _clientConnection; protected Connection _producerConnection; private Session _clientSession; @@ -82,47 +63,21 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase public void setUp() throws Exception { super.setUp(); - TransportConnection.createVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); - - System.err.println("amqj.logging.level:" + System.getProperty("amqj.logging.level")); - System.err.println("_logger.level:" + _logger.getLevel()); - System.err.println("_logger.isE-Error:" + _logger.isEnabledFor(Level.ERROR)); - System.err.println("_logger.isE-Warn:" + _logger.isEnabledFor(Level.WARN)); - System.err.println("_logger.isInfo:" + _logger.isInfoEnabled() + ":" + _logger.isEnabledFor(Level.INFO)); - System.err.println("_logger.isDebug:" + _logger.isDebugEnabled() + ":" + _logger.isEnabledFor(Level.DEBUG)); - System.err.println("_logger.isTrace:" + _logger.isTraceEnabled() + ":" + _logger.isEnabledFor(Level.TRACE)); - - System.err.println(Logger.getRootLogger().getLoggerRepository()); - - InitialContextFactory factory = new PropertiesFileInitialContextFactory(); - - Hashtable<String, String> env = new Hashtable<String, String>(); - - env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID/" + VHOST + "?brokerlist='" + BROKER + "'"); - env.put("queue.queue", QUEUE); - - _context = factory.getInitialContext(env); _messages = new Message[MSG_COUNT]; - _queue = (Queue) _context.lookup("queue"); - init(); - } - - @Override - public void tearDown() throws Exception - { - if (_producerConnection != null) - { - _producerConnection.close(); - } - - if (_clientConnection != null) - { - _clientConnection.close(); - } + _queue = getTestQueue(); + + //Create Producer + _producerConnection = getConnection(); + _producerConnection.start(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _producer = _producerSession.createProducer(_queue); - TransportConnection.killVMBroker(ApplicationRegistry.DEFAULT_INSTANCE); - super.tearDown(); + // Create consumer + _clientConnection = getConnection(); + _clientConnection.start(); + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _consumer = _clientSession.createConsumer(_queue, "key = 23"); } public void test() throws Exception @@ -139,7 +94,8 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase //Verify we get all the messages. _logger.info("Verifying messages"); - verifyAllMessagesRecevied(0); + verifyAllMessagesRecevied(50); + verifyBrokerState(0); //Close the connection.. .giving the broker time to clean up its state. _clientConnection.close(); @@ -149,39 +105,18 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase verifyBrokerState(0); } - protected void init() throws NamingException, JMSException, AMQException - { - //Create Producer - _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); - _producerConnection.start(); - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _producer = _producerSession.createProducer(_queue); - - // Create consumer - _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); - _clientConnection.start(); - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer = _clientSession.createConsumer(_queue, "key = 23"); - } - protected void verifyBrokerState(int expectedDepth) { try { - _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); - - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - catch (Exception e) - { - fail(e.getMessage()); - } + Connection connection = getConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try - { Thread.sleep(2000); - long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue); + long queueDepth = ((AMQSession) session).getQueueDepth((AMQDestination) _queue); assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); + + connection.close(); } catch (InterruptedException e) { @@ -191,34 +126,22 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase { fail(e.getMessage()); } - finally + catch (Exception e) { - try - { - _clientConnection.close(); - } - catch (JMSException e) - { - fail(e.getMessage()); - } + fail(e.getMessage()); } - } protected void verifyAllMessagesRecevied(int expectedDepth) throws Exception { - boolean[] msgIdRecevied = new boolean[MSG_COUNT]; - for (int i = 0; i < MSG_COUNT; i++) + for (int i = 0; i < expectedDepth; i++) { _messages[i] = _consumer.receive(1000); assertNotNull("should have received a message but didn't", _messages[i]); } - - long queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue); - assertEquals("Session reports Queue depth not as expected", expectedDepth, queueDepth); - + //Check received messages int msgId = 0; for (Message msg : _messages) @@ -231,7 +154,7 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase } //Check all received - for (msgId = 0; msgId < MSG_COUNT; msgId++) + for (msgId = 0; msgId < expectedDepth; msgId++) { assertTrue("Message " + msgId + " not received.", msgIdRecevied[msgId]); } @@ -241,9 +164,6 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase * Get the next message putting the given count into the intProperties as ID. * * @param msgNo the message count to store as ID. - * - * @return - * * @throws JMSException */ protected Message nextMessage(int msgNo) throws JMSException @@ -253,5 +173,4 @@ public class QueueDepthWithSelectorTest extends InternalBrokerBaseCase send.setIntProperty("key", 23); return send; } - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java index f54b022c09..97d825177c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java @@ -20,8 +20,8 @@ */ package org.apache.qpid.test.client; -import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.FailoverBaseCase; @@ -41,8 +41,6 @@ import java.util.Random; public class QueueBrowserAutoAckTest extends FailoverBaseCase { - private static final Logger _logger = Logger.getLogger(QueueBrowserAutoAckTest.class); - protected Connection _clientConnection; protected Session _clientSession; protected Queue _queue; @@ -53,10 +51,8 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase { super.setUp(); - //Create Client _clientConnection = getConnection(); - _clientConnection.start(); setupSession(); @@ -395,7 +391,6 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase closeBrowserBeforeAfterGetNext(10); validate(messages); - } /** @@ -454,19 +449,15 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase { int messages = 5; - sendMessages("connection1", messages); if (!CLUSTERED) { sendMessages("connection2", messages); } - checkQueueDepth(messages); - _logger.info("Creating Queue Browser"); - QueueBrowser queueBrowser = _clientSession.createBrowser(_queue); long queueDepth = 0; @@ -477,19 +468,17 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase } catch (AMQException e) { + fail("Caught exception getting queue depth: " + e.getMessage()); } assertEquals("Session reports Queue depth not as expected", messages, queueDepth); - int msgCount = 0; - int failPoint = 0; failPoint = new Random().nextInt(messages) + 1; Enumeration msgs = queueBrowser.getEnumeration(); - while (msgs.hasMoreElements()) { msgs.nextElement(); @@ -536,5 +525,4 @@ public class QueueBrowserAutoAckTest extends FailoverBaseCase //Validate all messages still on Broker 1 validate(messages); } - } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java index 81089a4dfc..fe929b4965 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java @@ -34,9 +34,8 @@ import java.io.*; /** - * @author Apache Software Foundation - * This test makes sure that utf8 characters can be used for - * specifying exchange, queue name and routing key. + * This test makes sure that utf8 characters can be used for + * specifying exchange, queue name and routing key. * * those tests are related to qpid-1384 */ @@ -44,7 +43,6 @@ public class UTF8Test extends QpidBrokerTestCase { private static final Logger _logger = LoggerFactory.getLogger(UTF8Test.class); - public void testPlainEn() throws Exception { invoke("UTF8En"); @@ -56,7 +54,6 @@ public class UTF8Test extends QpidBrokerTestCase invoke("UTF8Jp"); } - private void invoke(String name) throws Exception { String path = System.getProperties().getProperty("QPID_HOME"); @@ -65,6 +62,7 @@ public class UTF8Test extends QpidBrokerTestCase runTest(in.readLine(), in.readLine(), in.readLine(), in.readLine()); in.close(); } + private void runTest(String exchangeName, String queueName, String routingKey, String data) throws Exception { _logger.info("Running test for exchange: " + exchangeName @@ -89,26 +87,17 @@ public class UTF8Test extends QpidBrokerTestCase private void declareQueue(String exch, String routkey, String qname) throws Exception { - Connection conn = new Connection(); - if (!_broker.equals(QpidBrokerTestCase.EXTERNAL) && !isBroker08()) - { - conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false); - } - else - { - throw new Exception("unsupported test " + - "configuration. broker: " + _broker + " version > 0.10 "+ !isBroker08() + " This test must be run on a local broker using protocol 0.10 or higher."); - } - Session sess = conn.createSession(0); - sess.exchangeDeclare(exch, "direct", null, null); - sess.queueDeclare(qname, null, null); - sess.exchangeBind(qname, exch, routkey, null); - sess.sync(); - conn.close(); + Connection conn = new Connection(); + conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false); + Session sess = conn.createSession(0); + sess.exchangeDeclare(exch, "direct", null, null); + sess.queueDeclare(qname, null, null); + sess.exchangeBind(qname, exch, routkey, null); + sess.sync(); + conn.close(); } - private Destination getDestination(String exch, String routkey, String qname) - throws Exception + private Destination getDestination(String exch, String routkey, String qname) throws Exception { Properties props = new Properties(); props.setProperty("destination.directUTF8Queue", |
