summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-01-13 18:03:15 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-01-13 18:03:15 +0100
commit7a2415acfbd97bf509eb1eb97e98c37b51d10018 (patch)
tree3d80a86d5e00056bad1a3950a2ff592594005dd1
parentb3b77897cd3495fb35abeae278d3c231f23aadf5 (diff)
downloadrabbitmq-server-git-7a2415acfbd97bf509eb1eb97e98c37b51d10018.tar.gz
Test stream prometheus
-rw-r--r--deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl28
-rw-r--r--deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile2
-rw-r--r--deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/MetricsUtils.java211
-rw-r--r--deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java338
-rw-r--r--deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java183
5 files changed, 483 insertions, 279 deletions
diff --git a/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl b/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl
index 3e1590a46c..667bd0cdfb 100644
--- a/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl
+++ b/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl
@@ -28,36 +28,27 @@
-define(METRIC_NAME_PREFIX, "rabbitmq_stream_").
-define(METRICS_RAW,
- [% { ETS table, [ {index, conversion, Prometheus metrics name, type, help, key} ] }
+ [% { ETS table, [ {index, Prometheus metrics name, type, help, key} ] }
{?TABLE_PUBLISHER,
- [{2,
- undefined,
- publishers,
- gauge,
- "Number of publishers",
- publishers},
+ [{2, publishers, gauge, "Number of publishers", publishers},
{2,
- undefined,
publishers_messages_published_total,
counter,
"Total number of messages published to streams",
published},
{2,
- undefined,
publishers_messages_confirmed_total,
counter,
"Total number of messages confirmed",
confirmed},
{2,
- undefined,
publishers_messages_errored_total,
counter,
"Total number of messages errored",
errored}]},
{?TABLE_CONSUMER,
- [{2, undefined, consumers, gauge, "Number of consumers", consumers},
+ [{2, consumers, gauge, "Number of consumers", consumers},
{2,
- undefined,
consumers_messages_consumed_total,
counter,
"Total number of messages from streams",
@@ -108,23 +99,14 @@ get_data(Table, _) ->
mf(Callback, Contents, Data) ->
[begin
- Fun = case Conversion of
- undefined ->
- fun(D) -> proplists:get_value(Key, element(Index, D))
- end;
- BaseUnitConversionFactor ->
- fun(D) ->
- proplists:get_value(Key, element(Index, D))
- / BaseUnitConversionFactor
- end
- end,
+ Fun = fun(D) -> proplists:get_value(Key, element(Index, D)) end,
Callback(prometheus_model_helpers:create_mf(?METRIC_NAME(Name),
Help,
catch_boolean(Type),
?MODULE,
{Type, Fun, Data}))
end
- || {Index, Conversion, Name, Type, Help, Key} <- Contents].
+ || {Index, Name, Type, Help, Key} <- Contents].
collect_metrics(_Name, {Type, Fun, Items}) ->
[metric(Type, labels(Item), Fun(Item)) || Item <- Items].
diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile
index 16677504bc..fdd09d6a72 100644
--- a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile
+++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile
@@ -1,7 +1,7 @@
export PATH :=$(CURDIR):$(PATH)
HOSTNAME := $(shell hostname)
MVN_FLAGS += -Dstream.port=$(STREAM_PORT) \
- -Dmanagement.port=$(MANAGEMENT_PORT)
+ -Dprometheus.port=$(PROMETHEUS_PORT)
.PHONY: tests clean
diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/MetricsUtils.java b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/MetricsUtils.java
new file mode 100644
index 0000000000..40616cf56c
--- /dev/null
+++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/MetricsUtils.java
@@ -0,0 +1,211 @@
+// The contents of this file are subject to the Mozilla Public License
+// Version 2.0 (the "License"); you may not use this file except in
+// compliance with the License. You may obtain a copy of the License
+// at https://www.mozilla.org/en-US/MPL/2.0/
+//
+// Software distributed under the License is distributed on an "AS IS"
+// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+// the License for the specific language governing rights and
+// limitations under the License.
+//
+// The Original Code is RabbitMQ.
+//
+// The Initial Developer of the Original Code is Pivotal Software, Inc.
+// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
+//
+
+package com.rabbitmq.stream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+
+public class MetricsUtils {
+
+ static final String METRIC_PREFIX = "rabbitmq_stream_";
+ static final String METRIC_PUBLISHERS = "publishers";
+ static final String METRIC_PUBLISHERS_PUBLISHED = "publishers_messages_published_total";
+ static final String METRIC_PUBLISHERS_CONFIRMED = "publishers_messages_confirmed_total";
+ static final String METRIC_PUBLISHERS_ERRORED = "publishers_messages_errored_total";
+ static final String METRIC_CONSUMERS = "consumers";
+ static final String METRIC_CONSUMERS_CONSUMED = "consumers_messages_consumed_total";
+ static final List<String> METRICS =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ METRIC_PUBLISHERS,
+ METRIC_PUBLISHERS_PUBLISHED,
+ METRIC_PUBLISHERS_CONFIRMED,
+ METRIC_PUBLISHERS_ERRORED,
+ METRIC_CONSUMERS,
+ METRIC_CONSUMERS_CONSUMED));
+
+ static Metrics parseMetrics(String content) throws IOException {
+ Metrics metrics = new Metrics();
+ try (BufferedReader reader = new BufferedReader(new StringReader(content))) {
+ String line;
+ String type = null, name = null;
+ Metric metric = null;
+ while ((line = reader.readLine()) != null) {
+ if (line.trim().isEmpty()
+ || !line.contains(METRIC_PREFIX)
+ || line.contains("ct_rabbitmq_stream_prometheus")) {
+ // empty line, non-stream metrics,
+ // or line containing the name of the erlang node, which is the name of the test suite
+ // the latter shows up in some metrics
+ continue;
+ }
+ if (line.startsWith("# TYPE ")) {
+ String[] nameType = line.replace("# TYPE ", "").split(" ");
+ name = nameType[0];
+ type = nameType[1];
+ } else if (line.startsWith("# HELP ")) {
+ String help = line.replace("# HELP ", "").replace(name + " ", "");
+ metric = new Metric(name, type, help);
+ metrics.add(metric);
+ } else if (line.startsWith(name)) {
+ Map<String, String> labels = Collections.emptyMap();
+ if (line.contains("{")) {
+ String l = line.substring(line.indexOf("{") + 1, line.indexOf("}"));
+ labels =
+ Arrays.stream(l.split(","))
+ .map(label -> label.trim().split("="))
+ .collect(
+ () -> new HashMap<>(),
+ (acc, keyValue) -> acc.put(keyValue[0], keyValue[1].replace("\"", "")),
+ (BiConsumer<Map<String, String>, Map<String, String>>)
+ (stringStringHashMap, stringStringHashMap2) ->
+ stringStringHashMap.putAll(stringStringHashMap2));
+ }
+ int value;
+ try {
+ value = Integer.valueOf(line.split(" ")[1]);
+ } catch (NumberFormatException e) {
+ value = 0;
+ }
+ metric.add(new MetricValue(value, labels));
+ } else {
+ throw new IllegalStateException("Cannot parse line: " + line);
+ }
+ }
+ }
+
+ return metrics;
+ }
+
+ static class MetricValue {
+
+ final int value;
+ final Map<String, String> labels;
+
+ MetricValue(int value, Map<String, String> labels) {
+ this.value = value;
+ this.labels = labels == null ? Collections.emptyMap() : labels;
+ }
+
+ public int value() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "MetricValue{" + "value=" + value + ", labels=" + labels + '}';
+ }
+ }
+
+ static class Metric {
+
+ final String name;
+ final String type;
+ final String help;
+ final List<MetricValue> values = new ArrayList<>();
+
+ Metric(String name, String type, String help) {
+ this.name = name.replace(METRIC_PREFIX, "");
+ this.type = type;
+ this.help = help;
+ }
+
+ void add(MetricValue value) {
+ values.add(value);
+ }
+
+ boolean isGauge() {
+ return "gauge".equals(type);
+ }
+
+ boolean isCounter() {
+ return "counter".equals(type);
+ }
+
+ int value() {
+ if (values.size() != 1) {
+ throw new IllegalStateException();
+ }
+ return values.get(0).value;
+ }
+
+ public List<MetricValue> values() {
+ return values;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Metric metric = (Metric) o;
+ return name.equals(metric.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return "Metric{"
+ + "name='"
+ + name
+ + '\''
+ + ", type='"
+ + type
+ + '\''
+ + ", help='"
+ + help
+ + '\''
+ + ", values="
+ + values
+ + '}';
+ }
+ }
+
+ static class Metrics {
+
+ final Map<String, Metric> metrics = new HashMap<>();
+
+ void add(Metric metric) {
+ this.metrics.put(metric.name, metric);
+ }
+
+ Metric get(String name) {
+ return metrics.get(name);
+ }
+
+ @Override
+ public String toString() {
+ return "Metrics{" + "metrics=" + metrics + '}';
+ }
+ }
+}
diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java
index 34bdd72e97..40914361a0 100644
--- a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java
+++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java
@@ -11,38 +11,40 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
-// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
//
package com.rabbitmq.stream;
-import com.rabbitmq.stream.TestUtils.ClientFactory;
-import java.io.BufferedReader;
+import static com.rabbitmq.stream.MetricsUtils.*;
+import static com.rabbitmq.stream.MetricsUtils.parseMetrics;
+import static com.rabbitmq.stream.TestUtils.*;
+import static com.rabbitmq.stream.TestUtils.waitUntil;
+import static java.util.stream.Collectors.toList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.condition.AllOf.allOf;
+
+import com.rabbitmq.stream.MetricsUtils.Metrics;
+import com.rabbitmq.stream.TestUtils.CallableSupplier;
import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.BinaryOperator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
-@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class PrometheusHttpTest {
static OkHttpClient httpClient = new OkHttpClient.Builder().build();
- ClientFactory cf;
- String stream;
static String get(String endpoint) throws IOException {
return get(httpClient, endpoint);
@@ -62,141 +64,221 @@ public class PrometheusHttpTest {
return "http://localhost:" + TestUtils.prometheusPort() + "/metrics" + endpoint;
}
- static Metrics metrics() throws IOException {
- return parseMetrics(get(""));
+ @ParameterizedTest
+ @CsvSource({
+ METRIC_PUBLISHERS + ",true",
+ METRIC_PUBLISHERS_PUBLISHED + ",false",
+ METRIC_PUBLISHERS_CONFIRMED + ",false",
+ METRIC_PUBLISHERS_ERRORED + ",false",
+ METRIC_CONSUMERS + ",true",
+ METRIC_CONSUMERS_CONSUMED + ",false"
+ })
+ void aggregatedMetricsWithNoConnectionShouldReturnZero(String name, boolean isGauge)
+ throws Exception {
+ Metrics metrics = metrics();
+ assertThat(metrics.metrics).hasSameSizeAs(METRICS);
+ Metric metric = metrics.get(name);
+ assertThat(metric).isNotNull().has(help()).is(zero()).is(isGauge ? gauge() : counter());
}
- static Metrics parseMetrics(String content) throws IOException {
- Metrics metrics = new Metrics();
- try (BufferedReader reader = new BufferedReader(new StringReader(content))) {
- String line;
- String type = null, name = null;
- Metric metric = null;
- while ((line = reader.readLine()) != null) {
- if (line.trim().isEmpty() || !line.contains("rabbitmq_stream_")) {
- continue;
- }
- if (line.startsWith("# TYPE ")) {
- String[] nameType = line.replace("# TYPE ", "").split(" ");
- name = nameType[0];
- type = nameType[1];
- } else if (line.startsWith("# HELP ")) {
- String help = line.replace("# HELP ", "").replace(name + " ", "");
- metric = new Metric(name, type, help);
- metrics.add(metric);
- } else if (line.startsWith(name)) {
- Map<String, String> labels = Collections.emptyMap();
- if (line.contains("{")) {
- String l = line.substring(line.indexOf("{"), line.indexOf("}"));
- labels = Arrays.stream(l.split(",")).map(label -> label.trim().split("="))
- .collect(() -> new HashMap<>(),
- (acc, keyValue) -> acc.put(keyValue[0], keyValue[1].replace("\"", "")),
- (BiConsumer<Map<String, String>, Map<String, String>>) (stringStringHashMap, stringStringHashMap2) -> stringStringHashMap.putAll(stringStringHashMap2));
-
- }
- int value;
- try {
- value = Integer.valueOf(line.split(" ")[1]);
- } catch (NumberFormatException e) {
- value = 0;
- }
- metric.add(new MetricValue(value, labels));
- } else {
- throw new IllegalStateException("Cannot parse line: " + line);
- }
- }
- }
-
- return metrics;
+ @Test
+ void perObjectMetricsWithNoConnectionShouldReturnNoValue() throws Exception {
+ Metrics metrics = metricsPerObject();
+ METRICS.forEach(
+ name -> {
+ Metric metric = metrics.get(name);
+ assertThat(metric).isNotNull().has(noValue());
+ });
}
@Test
- void aggregatedMetricsWithNoConnectionShouldReturnZero() throws Exception {
- Metrics metrics = metrics();
- System.out.println(metrics);
- }
+ void aggregatedMetricsWithPublishersAndConsumersShouldReturnCorrectCounts(TestInfo info)
+ throws Exception {
+ List<String> streams =
+ IntStream.range(0, 5).mapToObj(i -> TestUtils.streamName(info)).collect(toList());
+ int producersCount = streams.size();
+ int consumersCount = streams.size() * 2;
+ int messagesByProducer = 10_000;
+ int messageCount = producersCount * messagesByProducer;
- static class MetricValue {
+ Environment env = Environment.builder().port(TestUtils.streamPort()).build();
+ List<Producer> producers = Collections.emptyList();
+ List<Consumer> consumers = Collections.emptyList();
+ CallableSupplier<Metrics> metricsCall = () -> metrics();
+ try {
+ streams.forEach(stream -> env.streamCreator().stream(stream).create());
- private final int value;
- private final Map<String, String> labels;
+ producers =
+ IntStream.range(0, producersCount)
+ .mapToObj(i -> env.producerBuilder().stream(streams.get(i % streams.size())).build())
+ .collect(toList());
- MetricValue(int value, Map<String, String> labels) {
- this.value = value;
- this.labels = labels == null ? Collections.emptyMap() : labels;
- }
+ waitUntil(() -> metricsCall.get().get(METRIC_PUBLISHERS).value() == producersCount);
- @Override
- public String toString() {
- return "MetricValue{" +
- "value=" + value +
- ", labels=" + labels +
- '}';
- }
- }
+ CountDownLatch confirmedLatch = new CountDownLatch(messageCount);
+ ConfirmationHandler confirmationHandler = status -> confirmedLatch.countDown();
+ producers.forEach(
+ producer -> {
+ IntStream.range(0, messagesByProducer)
+ .forEach(
+ i ->
+ producer.send(
+ producer.messageBuilder().addData("".getBytes()).build(),
+ confirmationHandler));
+ });
- static class Metric {
+ assertThat(confirmedLatch.await(10, TimeUnit.SECONDS)).isTrue();
- private final String name;
- private final String type;
- private final String help;
- private final List<MetricValue> values = new ArrayList<>();
+ waitUntil(() -> metricsCall.get().get(METRIC_PUBLISHERS_CONFIRMED).value() == messageCount);
- Metric(String name, String type, String help) {
- this.name = name;
- this.type = type;
- this.help = help;
- }
+ Metrics metrics = metricsCall.get();
+ assertThat(metrics.get(METRIC_PUBLISHERS_PUBLISHED)).has(value(messageCount));
+ assertThat(metrics.get(METRIC_PUBLISHERS_CONFIRMED)).has(value(messageCount));
+ assertThat(metrics.get(METRIC_PUBLISHERS_ERRORED)).is(zero());
+ assertThat(metrics.get(METRIC_CONSUMERS)).is(zero());
+ assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED)).is(zero());
- void add(MetricValue value) {
- values.add(value);
- }
+ int consumedMessageCount = consumersCount * messagesByProducer;
+ CountDownLatch consumedLatch = new CountDownLatch(consumedMessageCount);
+ consumers =
+ IntStream.range(0, consumersCount)
+ .mapToObj(
+ i ->
+ env.consumerBuilder().stream(streams.get(i % streams.size()))
+ .messageHandler((ctx, msg) -> consumedLatch.countDown())
+ .build())
+ .collect(toList());
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Metric metric = (Metric) o;
- return name.equals(metric.name);
- }
+ assertThat(consumedLatch.await(10, TimeUnit.SECONDS)).isTrue();
- @Override
- public int hashCode() {
- return Objects.hash(name);
- }
+ waitUntil(
+ () -> metricsCall.get().get(METRIC_CONSUMERS_CONSUMED).value() == consumedMessageCount);
+
+ metrics = metricsCall.get();
+ assertThat(metrics.get(METRIC_CONSUMERS)).has(value(consumersCount));
+ assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED)).has(value(consumedMessageCount));
- @Override
- public String toString() {
- return "Metric{" +
- "name='" + name + '\'' +
- ", type='" + type + '\'' +
- ", help='" + help + '\'' +
- ", values=" + values +
- '}';
+ } finally {
+ producers.forEach(producer -> producer.close());
+ consumers.forEach(consumer -> consumer.close());
+ streams.forEach(stream -> env.deleteStream(stream));
+ env.close();
}
}
- static class Metrics {
+ @Test
+ void perObjectMetricsWithPublishersAndConsumersShouldReturnCorrectCounts(TestInfo info)
+ throws Exception {
+ List<String> streams =
+ IntStream.range(0, 5).mapToObj(i -> TestUtils.streamName(info)).collect(toList());
+ int producersCount = streams.size();
+ int consumersCount = streams.size() * 2;
+ int messagesByProducer = 10_000;
+ int messageCount = producersCount * messagesByProducer;
- private final Map<String, Metric> metrics = new HashMap<>();
+ Environment env = Environment.builder().port(TestUtils.streamPort()).build();
+ List<Producer> producers = Collections.emptyList();
+ List<Consumer> consumers = Collections.emptyList();
+ CallableSupplier<Metrics> metricsCall = () -> metricsPerObject();
+ try {
+ streams.forEach(stream -> env.streamCreator().stream(stream).create());
- void add(Metric metric) {
- this.metrics.put(metric.name, metric);
- }
+ producers =
+ IntStream.range(0, producersCount)
+ .mapToObj(i -> env.producerBuilder().stream(streams.get(i % streams.size())).build())
+ .collect(toList());
- Metric get(String name) {
- return metrics.get(name);
- }
+ CountDownLatch confirmedLatch = new CountDownLatch(messageCount);
+ ConfirmationHandler confirmationHandler = status -> confirmedLatch.countDown();
+ producers.forEach(
+ producer -> {
+ IntStream.range(0, messagesByProducer)
+ .forEach(
+ i ->
+ producer.send(
+ producer.messageBuilder().addData("".getBytes()).build(),
+ confirmationHandler));
+ });
+
+ assertThat(confirmedLatch.await(10, TimeUnit.SECONDS)).isTrue();
+
+ waitUntil(
+ () ->
+ metricsCall.get().get(METRIC_PUBLISHERS_CONFIRMED).values().stream()
+ .mapToInt(MetricValue::value)
+ .sum()
+ == messageCount);
+
+ Metrics metrics = metricsCall.get();
+ assertThat(metrics.get(METRIC_PUBLISHERS)).has(noValue());
+ assertThat(metrics.get(METRIC_PUBLISHERS_PUBLISHED))
+ .has(valueCount(producersCount))
+ .has(valuesWithLabels("vhost", "queue", "connection", "id"))
+ .has(
+ allOf(
+ streams.stream()
+ .map(s -> value("queue", s, messagesByProducer))
+ .collect(toList())));
+ assertThat(metrics.get(METRIC_PUBLISHERS_CONFIRMED))
+ .has(valueCount(producersCount))
+ .has(valuesWithLabels("vhost", "queue", "connection", "id"))
+ .has(
+ allOf(
+ streams.stream()
+ .map(s -> value("queue", s, messagesByProducer))
+ .collect(toList())));
+ assertThat(metrics.get(METRIC_PUBLISHERS_ERRORED))
+ .has(valueCount(producersCount))
+ .has(valuesWithLabels("vhost", "queue", "connection", "id"))
+ .has(allOf(streams.stream().map(s -> value("queue", s, 0)).collect(toList())));
+ assertThat(metrics.get(METRIC_CONSUMERS)).has(noValue());
+ assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED)).has(noValue());
+
+ int consumedMessageCount = consumersCount * messagesByProducer;
+ CountDownLatch consumedLatch = new CountDownLatch(consumedMessageCount);
+ consumers =
+ IntStream.range(0, consumersCount)
+ .mapToObj(
+ i ->
+ env.consumerBuilder().stream(streams.get(i % streams.size()))
+ .messageHandler((ctx, msg) -> consumedLatch.countDown())
+ .build())
+ .collect(toList());
+
+ assertThat(consumedLatch.await(10, TimeUnit.SECONDS)).isTrue();
- @Override
- public String toString() {
- return "Metrics{" +
- "metrics=" + metrics +
- '}';
+ waitUntil(
+ () ->
+ metricsCall.get().get(METRIC_CONSUMERS_CONSUMED).values().stream()
+ .mapToInt(MetricValue::value)
+ .sum()
+ == consumedMessageCount);
+
+ metrics = metricsCall.get();
+ assertThat(metrics.get(METRIC_CONSUMERS)).has(noValue());
+ assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED))
+ .has(valueCount(consumersCount))
+ .has(valuesWithLabels("vhost", "queue", "connection", "id"))
+ .has(
+ allOf(
+ streams.stream()
+ .flatMap(s -> Stream.of(s, s))
+ .map(s -> value("queue", s, messagesByProducer))
+ .collect(toList())));
+
+ } finally {
+ producers.forEach(producer -> producer.close());
+ consumers.forEach(consumer -> consumer.close());
+ streams.forEach(stream -> env.deleteStream(stream));
+ env.close();
}
}
+
+ static Metrics metrics() throws IOException {
+ return parseMetrics(get(""));
+ }
+
+ static Metrics metricsPerObject() throws IOException {
+ return parseMetrics(get("/per-object"));
+ }
}
diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
index 08f005f815..b911dd8ddd 100644
--- a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
+++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java
@@ -11,32 +11,21 @@
// The Original Code is RabbitMQ.
//
// The Initial Developer of the Original Code is Pivotal Software, Inc.
-// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
+// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
//
package com.rabbitmq.stream;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
-import com.rabbitmq.stream.impl.Client;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import java.lang.reflect.Field;
+import com.rabbitmq.stream.MetricsUtils.Metric;
import java.lang.reflect.Method;
import java.time.Duration;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import okhttp3.Authenticator;
-import okhttp3.Credentials;
+import org.assertj.core.api.Condition;
import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
public class TestUtils {
@@ -76,104 +65,15 @@ public class TestUtils {
boolean getAsBoolean() throws Exception;
}
- static class StreamTestInfrastructureExtension
- implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
-
- private static final ExtensionContext.Namespace NAMESPACE =
- ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class);
-
- private static ExtensionContext.Store store(ExtensionContext extensionContext) {
- return extensionContext.getRoot().getStore(NAMESPACE);
- }
-
- private static EventLoopGroup eventLoopGroup(ExtensionContext context) {
- return (EventLoopGroup) store(context).get("nettyEventLoopGroup");
- }
-
- @Override
- public void beforeAll(ExtensionContext context) {
- store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
- }
-
- @Override
- public void beforeEach(ExtensionContext context) throws Exception {
- try {
- Field streamField =
- context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup");
- streamField.setAccessible(true);
- streamField.set(context.getTestInstance().get(), eventLoopGroup(context));
- } catch (NoSuchFieldException e) {
-
- }
- try {
- Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
- streamField.setAccessible(true);
- String stream = streamName(context);
- streamField.set(context.getTestInstance().get(), stream);
- Client client =
- new Client(
- new Client.ClientParameters()
- .eventLoopGroup(eventLoopGroup(context))
- .port(streamPort()));
- Client.Response response = client.create(stream);
- assertThat(response.isOk()).isTrue();
- client.close();
- store(context).put("testMethodStream", stream);
- } catch (NoSuchFieldException e) {
-
- }
-
- for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) {
- if (declaredField.getType().equals(ClientFactory.class)) {
- declaredField.setAccessible(true);
- ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context));
- declaredField.set(context.getTestInstance().get(), clientFactory);
- store(context).put("testClientFactory", clientFactory);
- break;
- }
- }
- }
-
- @Override
- public void afterEach(ExtensionContext context) throws Exception {
- try {
- Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
- streamField.setAccessible(true);
- String stream = (String) streamField.get(context.getTestInstance().get());
- Client client =
- new Client(
- new Client.ClientParameters()
- .eventLoopGroup(eventLoopGroup(context))
- .port(streamPort()));
- Client.Response response = client.delete(stream);
- assertThat(response.isOk()).isTrue();
- client.close();
- store(context).remove("testMethodStream");
- } catch (NoSuchFieldException e) {
-
- }
-
- ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory");
- if (clientFactory != null) {
- clientFactory.close();
- }
- }
-
- @Override
- public void afterAll(ExtensionContext context) throws Exception {
- EventLoopGroup eventLoopGroup = eventLoopGroup(context);
- eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
- }
+ @FunctionalInterface
+ interface CallableSupplier<T> {
+ T get() throws Exception;
}
static String streamName(TestInfo info) {
return streamName(info.getTestClass().get(), info.getTestMethod().get());
}
- private static String streamName(ExtensionContext context) {
- return streamName(context.getTestInstance().get().getClass(), context.getTestMethod().get());
- }
-
private static String streamName(Class<?> testClass, Method testMethod) {
String uuid = UUID.randomUUID().toString();
return String.format(
@@ -181,29 +81,58 @@ public class TestUtils {
testClass.getSimpleName(), testMethod.getName(), uuid.substring(uuid.length() / 2));
}
- static class ClientFactory {
+ static Condition<Metric> gauge() {
+ return new Condition<>(m -> m.isGauge(), "should be a gauge");
+ }
- private final EventLoopGroup eventLoopGroup;
- private final Set<Client> clients = ConcurrentHashMap.newKeySet();
+ static Condition<Metric> counter() {
+ return new Condition<>(m -> m.isCounter(), "should be a counter");
+ }
- public ClientFactory(EventLoopGroup eventLoopGroup) {
- this.eventLoopGroup = eventLoopGroup;
- }
+ static Condition<Metric> help() {
+ return new Condition<>(m -> m.help != null, "should have a help description");
+ }
- public Client get() {
- return get(new Client.ClientParameters());
- }
+ static Condition<Metric> zero() {
+ return new Condition<>(
+ m -> m.values.size() == 1 && m.values.get(0).value == 0, "should have one metric at 0");
+ }
- public Client get(Client.ClientParameters parameters) {
- Client client = new Client(parameters.eventLoopGroup(eventLoopGroup).port(streamPort()));
- clients.add(client);
- return client;
- }
+ static Condition<Metric> noValue() {
+ return new Condition<>(m -> m.values.isEmpty(), "should have no value");
+ }
- private void close() {
- for (Client c : clients) {
- c.close();
- }
- }
+ static Condition<Metric> value(int expected) {
+ return new Condition<>(m -> m.value() == expected, "should have value " + expected);
+ }
+
+ static Condition<Metric> valueCount(int expected) {
+ return new Condition<>(m -> m.values.size() == expected, "should have " + expected + " values");
+ }
+
+ static Condition<Metric> valuesWithLabels(String... expectedLabels) {
+ Collection<String> expected = Arrays.asList(expectedLabels);
+ return new Condition<>(
+ m ->
+ m.values().stream()
+ .map(v -> v.labels.keySet())
+ .map(labels -> labels.containsAll(expected))
+ .reduce(true, (b1, b2) -> b1 && b2),
+ "should have values with labels " + String.join(",", expected));
+ }
+
+ static Condition<Metric> value(String labelKey, String labelValue, int value) {
+ return new Condition<>(
+ m ->
+ m.values().stream()
+ .filter(v -> v.labels.containsKey(labelKey))
+ .filter(v -> v.labels.get(labelKey).equals(labelValue))
+ .filter(v -> v.value() == value)
+ .count()
+ >= 1,
+ "should have value with %s=%s %d",
+ labelKey,
+ labelValue,
+ value);
}
}