diff options
| author | Keith Wall <kwall@apache.org> | 2012-07-02 14:26:36 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-07-02 14:26:36 +0000 |
| commit | 220f542e8e00932cfcd3d4c4f7832c58b7fef4cd (patch) | |
| tree | 257838ce9fa6491e2333ef15fd0203c24bb23297 /qpid/java/perftests/src/main | |
| parent | 7d96daca11cde3df099c652b671e3bf8ab2627b6 (diff) | |
| download | qpid-python-220f542e8e00932cfcd3d4c4f7832c58b7fef4cd.tar.gz | |
QPID-4089: Add latency tests into java performance test framework
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1356250 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src/main')
8 files changed, 275 insertions, 19 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java index 1b5e8276c2..368a25c929 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java @@ -20,13 +20,16 @@ package org.apache.qpid.disttest.client; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.NavigableSet; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -53,12 +56,17 @@ public class ConsumerParticipant implements Participant private long _startTime; private volatile Exception _asyncMessageListenerException; + private List<Long> _messageLatencies; public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command) { _jmsDelegate = delegate; _command = command; _resultFactory = new ParticipantResultFactory(); + if (command.isEvaluateLatency()) + { + _messageLatencies = new ArrayList<Long>(); + } } @Override @@ -105,7 +113,7 @@ public class ConsumerParticipant implements Participant numberOfMessagesSent, payloadSize, totalPayloadSize, - start, end); + start, end, _messageLatencies); return result; } @@ -130,25 +138,42 @@ public class ConsumerParticipant implements Participant */ private boolean processMessage(Message message) { - int messageCount = _totalNumberOfMessagesReceived.incrementAndGet(); - if (LOGGER.isTraceEnabled()) - { - LOGGER.trace("message " + messageCount + " received by " + this); - } - int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message); - _allConsumedPayloadSizes.add(messagePayloadSize); - _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize); - + int messageCount = message == null? _totalNumberOfMessagesReceived.get() : _totalNumberOfMessagesReceived.incrementAndGet() ; boolean batchEnabled = _command.getBatchSize() > 0; boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0; - - if (!batchEnabled || batchComplete) + if (message != null) { - if (LOGGER.isTraceEnabled() && batchEnabled) + if (LOGGER.isTraceEnabled()) + { + LOGGER.trace("message " + messageCount + " received by " + this); + } + int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message); + _allConsumedPayloadSizes.add(messagePayloadSize); + _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize); + + if (_command.isEvaluateLatency()) + { + long mesageTimestamp; + try + { + mesageTimestamp = message.getJMSTimestamp(); + } + catch (JMSException e) + { + throw new DistributedTestException("Cannot get message timestamp!", e); + } + long latency = System.currentTimeMillis() - mesageTimestamp; + _messageLatencies.add(latency); + } + + if (!batchEnabled || batchComplete) { - LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); + if (LOGGER.isTraceEnabled() && batchEnabled) + { + LOGGER.trace("Committing: batch size " + _command.getBatchSize() ); + } + _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); } - _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName()); } boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages(); diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java index 7f6b96b87c..50c0a74ccd 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java @@ -18,6 +18,7 @@ */ package org.apache.qpid.disttest.client; +import java.util.Collection; import java.util.Date; import org.apache.qpid.disttest.message.ConsumerParticipantResult; @@ -26,12 +27,24 @@ import org.apache.qpid.disttest.message.CreateParticpantCommand; import org.apache.qpid.disttest.message.CreateProducerCommand; import org.apache.qpid.disttest.message.ParticipantResult; import org.apache.qpid.disttest.message.ProducerParticipantResult; +import org.apache.qpid.disttest.results.aggregation.SeriesStatistics; public class ParticipantResultFactory { - public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName, CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize, long totalPayloadReceived, Date start, Date end) + public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName, + CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize, + long totalPayloadReceived, Date start, Date end) + { + return createForConsumer(participantName, clientRegisteredName, command, acknowledgeMode, numberOfMessagesReceived, + payloadSize, totalPayloadReceived, start, end, null); + } + + public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName, + CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize, + long totalPayloadReceived, Date start, Date end, Collection<Long> messageLatencies) { ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult(); + consumerParticipantResult.setMessageLatencies(messageLatencies); setTestProperties(consumerParticipantResult, command, participantName, clientRegisteredName, acknowledgeMode); setTestResultProperties(consumerParticipantResult, numberOfMessagesReceived, payloadSize, totalPayloadReceived, start, end); @@ -45,6 +58,11 @@ public class ParticipantResultFactory consumerParticipantResult.setTotalNumberOfConsumers(1); consumerParticipantResult.setTotalNumberOfProducers(0); + SeriesStatistics statistics = new SeriesStatistics(messageLatencies); + consumerParticipantResult.setAverageLatency(statistics.getAverage()); + consumerParticipantResult.setMinLatency(statistics.getMinimum()); + consumerParticipantResult.setMaxLatency(statistics.getMaximum()); + consumerParticipantResult.setLatencyStandardDeviation(statistics.getStandardDeviation()); return consumerParticipantResult; } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java index ed47e02667..8b4503b0ad 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java @@ -10,6 +10,7 @@ public class ConsumerConfig extends ParticipantConfig private String _selector; private boolean _noLocal; private boolean _synchronous; + private boolean _evaluateLatency; // For Gson public ConsumerConfig() @@ -58,6 +59,7 @@ public class ConsumerConfig extends ParticipantConfig createConsumerCommand.setSelector(_selector); createConsumerCommand.setNoLocal(_noLocal); createConsumerCommand.setSynchronous(_synchronous); + createConsumerCommand.setEvaluateLatency(_evaluateLatency); return createConsumerCommand; } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java index f92e3ea538..566d4e2076 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java @@ -25,6 +25,8 @@ import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR; import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SYNCHRONOUS_CONSUMER; import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_TOPIC; +import java.util.Collection; + public class ConsumerParticipantResult extends ParticipantResult { private boolean _topic; @@ -34,6 +36,12 @@ public class ConsumerParticipantResult extends ParticipantResult private boolean _noLocal; private boolean _synchronousConsumer; + private Collection<Long> _messageLatencies; + private long _minLatency; + private long _maxLatency; + private double _averageLatency; + private double _latencyStandardDeviation; + public ConsumerParticipantResult() { super(CommandType.CONSUMER_PARTICIPANT_RESULT); @@ -115,4 +123,59 @@ public class ConsumerParticipantResult extends ParticipantResult { return _topic; } + + public Collection<Long> getMessageLatencies() + { + return _messageLatencies; + } + + public void setMessageLatencies(Collection<Long> messageLatencies) + { + _messageLatencies = messageLatencies; + } + + @OutputAttribute(attribute=ParticipantAttribute.MIN_LATENCY) + public long getMinLatency() + { + return _minLatency; + } + + public void setMinLatency(long minLatency) + { + _minLatency = minLatency; + } + + @OutputAttribute(attribute=ParticipantAttribute.MAX_LATENCY) + public long getMaxLatency() + { + return _maxLatency; + } + + public void setMaxLatency(long maxLatency) + { + _maxLatency = maxLatency; + } + + @OutputAttribute(attribute=ParticipantAttribute.AVERAGE_LATENCY) + public double getAverageLatency() + { + return _averageLatency; + } + + public void setAverageLatency(double averageLatency) + { + _averageLatency = averageLatency; + } + + @OutputAttribute(attribute=ParticipantAttribute.LATENCY_STANDARD_DEVIATION) + public double getLatencyStandardDeviation() + { + return _latencyStandardDeviation; + } + + public void setLatencyStandardDeviation(double latencyStandardDeviation) + { + _latencyStandardDeviation = latencyStandardDeviation; + } + } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java index 678e428f94..68c21fbf83 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java @@ -28,7 +28,7 @@ public class CreateConsumerCommand extends CreateParticpantCommand private boolean _noLocal; private boolean _synchronous; private long _receiveTimeout = 5000; - + private boolean _evaluateLatency; public CreateConsumerCommand() { @@ -105,4 +105,14 @@ public class CreateConsumerCommand extends CreateParticpantCommand { return _receiveTimeout; } + + public boolean isEvaluateLatency() + { + return _evaluateLatency; + } + + public void setEvaluateLatency(boolean evaluateLatency) + { + _evaluateLatency = evaluateLatency; + } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java index ccc7c0d9fb..0644ec16a3 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java @@ -54,7 +54,12 @@ public enum ParticipantAttribute TOTAL_PAYLOAD_PROCESSED("totalPayloadProcessedB"), THROUGHPUT("throughputKbPerS"), TIME_TAKEN("timeTakenMs"), - ERROR_MESSAGE("errorMessage"); + ERROR_MESSAGE("errorMessage"), + MIN_LATENCY("minLatency"), + MAX_LATENCY("maxLatency"), + AVERAGE_LATENCY("averageLatency"), + LATENCY_STANDARD_DEVIATION("latencyStandardDeviation") + ; private String _displayName; diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java index 207d0131eb..dde717c71b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java @@ -23,6 +23,7 @@ import java.util.Date; import java.util.NavigableSet; import java.util.TreeSet; +import org.apache.qpid.disttest.message.ConsumerParticipantResult; import org.apache.qpid.disttest.message.ParticipantResult; public class ParticipantResultAggregator @@ -44,6 +45,8 @@ public class ParticipantResultAggregator private NavigableSet<Integer> _encounteredAcknowledgeMode = new TreeSet<Integer>(); private NavigableSet<String> _encountedTestNames = new TreeSet<String>(); + private SeriesStatistics _latencyStatistics = new SeriesStatistics(); + public ParticipantResultAggregator(Class<? extends ParticipantResult> targetClass, String aggregateResultName) { _aggregatedResultName = aggregateResultName; @@ -56,12 +59,31 @@ public class ParticipantResultAggregator { rollupConstantAttributes(result); computeVariableAttributes(result); + if (result instanceof ConsumerParticipantResult) + { + ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult)result; + _latencyStatistics.addMessageLatencies(consumerParticipantResult.getMessageLatencies()); + _latencyStatistics.aggregate(); + } } } public ParticipantResult getAggregatedResult() { - ParticipantResult aggregatedResult = new ParticipantResult(_aggregatedResultName); + ParticipantResult aggregatedResult; + if (_targetClass == ConsumerParticipantResult.class) + { + ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult(_aggregatedResultName); + consumerParticipantResult.setAverageLatency(_latencyStatistics.getAverage()); + consumerParticipantResult.setMinLatency(_latencyStatistics.getMinimum()); + consumerParticipantResult.setMaxLatency(_latencyStatistics.getMaximum()); + consumerParticipantResult.setLatencyStandardDeviation(_latencyStatistics.getStandardDeviation()); + aggregatedResult = consumerParticipantResult; + } + else + { + aggregatedResult = new ParticipantResult(_aggregatedResultName); + } setRolledUpConstantAttributes(aggregatedResult); setComputedVariableAttributes(aggregatedResult); diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java new file mode 100644 index 0000000000..b93c210473 --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.disttest.results.aggregation; + +import java.util.Collection; +import java.util.concurrent.CopyOnWriteArrayList; + +public class SeriesStatistics +{ + private long _minValue; + private long _maxValue; + private double _mean; + private double _standardDeviation; + private Collection<Long> _series = new CopyOnWriteArrayList<Long>(); + + public SeriesStatistics() + { + super(); + } + + public SeriesStatistics(Collection<Long> messageLatencies) + { + setMessageLatencies(messageLatencies); + } + + public void addMessageLatencies(Collection<Long> messageLatencies) + { + if (messageLatencies != null) + { + _series.addAll(messageLatencies); + } + } + + public void setMessageLatencies(Collection<Long> messageLatencies) + { + _series = messageLatencies; + aggregate(); + } + + public void aggregate() + { + if (_series != null && _series.size() > 0) + { + long minLatency = Long.MAX_VALUE; + long maxLatency = Long.MIN_VALUE; + long totalLatency = 0; + for (Long latency : _series) + { + totalLatency += latency; + minLatency = Math.min(minLatency, latency); + maxLatency = Math.max(maxLatency, latency); + } + _mean = ((double) totalLatency) / (double) _series.size(); + _minValue = minLatency; + _maxValue = maxLatency; + double sum = 0; + for (Long latency : _series) + { + double diff = latency - _mean; + sum += diff * diff; + } + long size = _series.size() == 1 ? 1: _series.size() - 1; + _standardDeviation = Math.sqrt(sum / (double) size); + } + else + { + _mean = 0; + _minValue = 0; + _maxValue = 0; + _standardDeviation = 0; + } + } + + public long getMinimum() + { + return _minValue; + } + + public long getMaximum() + { + return _maxValue; + } + + public double getAverage() + { + return _mean; + } + + public double getStandardDeviation() + { + return _standardDeviation; + } +} |
