diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-01-13 18:03:15 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-01-13 18:03:15 +0100 |
commit | 7a2415acfbd97bf509eb1eb97e98c37b51d10018 (patch) | |
tree | 3d80a86d5e00056bad1a3950a2ff592594005dd1 | |
parent | b3b77897cd3495fb35abeae278d3c231f23aadf5 (diff) | |
download | rabbitmq-server-git-7a2415acfbd97bf509eb1eb97e98c37b51d10018.tar.gz |
Test stream prometheus
5 files changed, 483 insertions, 279 deletions
diff --git a/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl b/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl index 3e1590a46c..667bd0cdfb 100644 --- a/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl +++ b/deps/rabbitmq_stream_prometheus/src/collectors/prometheus_rabbitmq_stream_collector.erl @@ -28,36 +28,27 @@ -define(METRIC_NAME_PREFIX, "rabbitmq_stream_"). -define(METRICS_RAW, - [% { ETS table, [ {index, conversion, Prometheus metrics name, type, help, key} ] } + [% { ETS table, [ {index, Prometheus metrics name, type, help, key} ] } {?TABLE_PUBLISHER, - [{2, - undefined, - publishers, - gauge, - "Number of publishers", - publishers}, + [{2, publishers, gauge, "Number of publishers", publishers}, {2, - undefined, publishers_messages_published_total, counter, "Total number of messages published to streams", published}, {2, - undefined, publishers_messages_confirmed_total, counter, "Total number of messages confirmed", confirmed}, {2, - undefined, publishers_messages_errored_total, counter, "Total number of messages errored", errored}]}, {?TABLE_CONSUMER, - [{2, undefined, consumers, gauge, "Number of consumers", consumers}, + [{2, consumers, gauge, "Number of consumers", consumers}, {2, - undefined, consumers_messages_consumed_total, counter, "Total number of messages from streams", @@ -108,23 +99,14 @@ get_data(Table, _) -> mf(Callback, Contents, Data) -> [begin - Fun = case Conversion of - undefined -> - fun(D) -> proplists:get_value(Key, element(Index, D)) - end; - BaseUnitConversionFactor -> - fun(D) -> - proplists:get_value(Key, element(Index, D)) - / BaseUnitConversionFactor - end - end, + Fun = fun(D) -> proplists:get_value(Key, element(Index, D)) end, Callback(prometheus_model_helpers:create_mf(?METRIC_NAME(Name), Help, catch_boolean(Type), ?MODULE, {Type, Fun, Data})) end - || {Index, Conversion, Name, Type, Help, Key} <- Contents]. + || {Index, Name, Type, Help, Key} <- Contents]. collect_metrics(_Name, {Type, Fun, Items}) -> [metric(Type, labels(Item), Fun(Item)) || Item <- Items]. diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile index 16677504bc..fdd09d6a72 100644 --- a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile +++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/Makefile @@ -1,7 +1,7 @@ export PATH :=$(CURDIR):$(PATH) HOSTNAME := $(shell hostname) MVN_FLAGS += -Dstream.port=$(STREAM_PORT) \ - -Dmanagement.port=$(MANAGEMENT_PORT) + -Dprometheus.port=$(PROMETHEUS_PORT) .PHONY: tests clean diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/MetricsUtils.java b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/MetricsUtils.java new file mode 100644 index 0000000000..40616cf56c --- /dev/null +++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/MetricsUtils.java @@ -0,0 +1,211 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. +// + +package com.rabbitmq.stream; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiConsumer; + +public class MetricsUtils { + + static final String METRIC_PREFIX = "rabbitmq_stream_"; + static final String METRIC_PUBLISHERS = "publishers"; + static final String METRIC_PUBLISHERS_PUBLISHED = "publishers_messages_published_total"; + static final String METRIC_PUBLISHERS_CONFIRMED = "publishers_messages_confirmed_total"; + static final String METRIC_PUBLISHERS_ERRORED = "publishers_messages_errored_total"; + static final String METRIC_CONSUMERS = "consumers"; + static final String METRIC_CONSUMERS_CONSUMED = "consumers_messages_consumed_total"; + static final List<String> METRICS = + Collections.unmodifiableList( + Arrays.asList( + METRIC_PUBLISHERS, + METRIC_PUBLISHERS_PUBLISHED, + METRIC_PUBLISHERS_CONFIRMED, + METRIC_PUBLISHERS_ERRORED, + METRIC_CONSUMERS, + METRIC_CONSUMERS_CONSUMED)); + + static Metrics parseMetrics(String content) throws IOException { + Metrics metrics = new Metrics(); + try (BufferedReader reader = new BufferedReader(new StringReader(content))) { + String line; + String type = null, name = null; + Metric metric = null; + while ((line = reader.readLine()) != null) { + if (line.trim().isEmpty() + || !line.contains(METRIC_PREFIX) + || line.contains("ct_rabbitmq_stream_prometheus")) { + // empty line, non-stream metrics, + // or line containing the name of the erlang node, which is the name of the test suite + // the latter shows up in some metrics + continue; + } + if (line.startsWith("# TYPE ")) { + String[] nameType = line.replace("# TYPE ", "").split(" "); + name = nameType[0]; + type = nameType[1]; + } else if (line.startsWith("# HELP ")) { + String help = line.replace("# HELP ", "").replace(name + " ", ""); + metric = new Metric(name, type, help); + metrics.add(metric); + } else if (line.startsWith(name)) { + Map<String, String> labels = Collections.emptyMap(); + if (line.contains("{")) { + String l = line.substring(line.indexOf("{") + 1, line.indexOf("}")); + labels = + Arrays.stream(l.split(",")) + .map(label -> label.trim().split("=")) + .collect( + () -> new HashMap<>(), + (acc, keyValue) -> acc.put(keyValue[0], keyValue[1].replace("\"", "")), + (BiConsumer<Map<String, String>, Map<String, String>>) + (stringStringHashMap, stringStringHashMap2) -> + stringStringHashMap.putAll(stringStringHashMap2)); + } + int value; + try { + value = Integer.valueOf(line.split(" ")[1]); + } catch (NumberFormatException e) { + value = 0; + } + metric.add(new MetricValue(value, labels)); + } else { + throw new IllegalStateException("Cannot parse line: " + line); + } + } + } + + return metrics; + } + + static class MetricValue { + + final int value; + final Map<String, String> labels; + + MetricValue(int value, Map<String, String> labels) { + this.value = value; + this.labels = labels == null ? Collections.emptyMap() : labels; + } + + public int value() { + return value; + } + + @Override + public String toString() { + return "MetricValue{" + "value=" + value + ", labels=" + labels + '}'; + } + } + + static class Metric { + + final String name; + final String type; + final String help; + final List<MetricValue> values = new ArrayList<>(); + + Metric(String name, String type, String help) { + this.name = name.replace(METRIC_PREFIX, ""); + this.type = type; + this.help = help; + } + + void add(MetricValue value) { + values.add(value); + } + + boolean isGauge() { + return "gauge".equals(type); + } + + boolean isCounter() { + return "counter".equals(type); + } + + int value() { + if (values.size() != 1) { + throw new IllegalStateException(); + } + return values.get(0).value; + } + + public List<MetricValue> values() { + return values; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Metric metric = (Metric) o; + return name.equals(metric.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public String toString() { + return "Metric{" + + "name='" + + name + + '\'' + + ", type='" + + type + + '\'' + + ", help='" + + help + + '\'' + + ", values=" + + values + + '}'; + } + } + + static class Metrics { + + final Map<String, Metric> metrics = new HashMap<>(); + + void add(Metric metric) { + this.metrics.put(metric.name, metric); + } + + Metric get(String name) { + return metrics.get(name); + } + + @Override + public String toString() { + return "Metrics{" + "metrics=" + metrics + '}'; + } + } +} diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java index 34bdd72e97..40914361a0 100644 --- a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java +++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/PrometheusHttpTest.java @@ -11,38 +11,40 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. // package com.rabbitmq.stream; -import com.rabbitmq.stream.TestUtils.ClientFactory; -import java.io.BufferedReader; +import static com.rabbitmq.stream.MetricsUtils.*; +import static com.rabbitmq.stream.MetricsUtils.parseMetrics; +import static com.rabbitmq.stream.TestUtils.*; +import static com.rabbitmq.stream.TestUtils.waitUntil; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.condition.AllOf.allOf; + +import com.rabbitmq.stream.MetricsUtils.Metrics; +import com.rabbitmq.stream.TestUtils.CallableSupplier; import java.io.IOException; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.BinaryOperator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.Stream; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.ResponseBody; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; -@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class PrometheusHttpTest { static OkHttpClient httpClient = new OkHttpClient.Builder().build(); - ClientFactory cf; - String stream; static String get(String endpoint) throws IOException { return get(httpClient, endpoint); @@ -62,141 +64,221 @@ public class PrometheusHttpTest { return "http://localhost:" + TestUtils.prometheusPort() + "/metrics" + endpoint; } - static Metrics metrics() throws IOException { - return parseMetrics(get("")); + @ParameterizedTest + @CsvSource({ + METRIC_PUBLISHERS + ",true", + METRIC_PUBLISHERS_PUBLISHED + ",false", + METRIC_PUBLISHERS_CONFIRMED + ",false", + METRIC_PUBLISHERS_ERRORED + ",false", + METRIC_CONSUMERS + ",true", + METRIC_CONSUMERS_CONSUMED + ",false" + }) + void aggregatedMetricsWithNoConnectionShouldReturnZero(String name, boolean isGauge) + throws Exception { + Metrics metrics = metrics(); + assertThat(metrics.metrics).hasSameSizeAs(METRICS); + Metric metric = metrics.get(name); + assertThat(metric).isNotNull().has(help()).is(zero()).is(isGauge ? gauge() : counter()); } - static Metrics parseMetrics(String content) throws IOException { - Metrics metrics = new Metrics(); - try (BufferedReader reader = new BufferedReader(new StringReader(content))) { - String line; - String type = null, name = null; - Metric metric = null; - while ((line = reader.readLine()) != null) { - if (line.trim().isEmpty() || !line.contains("rabbitmq_stream_")) { - continue; - } - if (line.startsWith("# TYPE ")) { - String[] nameType = line.replace("# TYPE ", "").split(" "); - name = nameType[0]; - type = nameType[1]; - } else if (line.startsWith("# HELP ")) { - String help = line.replace("# HELP ", "").replace(name + " ", ""); - metric = new Metric(name, type, help); - metrics.add(metric); - } else if (line.startsWith(name)) { - Map<String, String> labels = Collections.emptyMap(); - if (line.contains("{")) { - String l = line.substring(line.indexOf("{"), line.indexOf("}")); - labels = Arrays.stream(l.split(",")).map(label -> label.trim().split("=")) - .collect(() -> new HashMap<>(), - (acc, keyValue) -> acc.put(keyValue[0], keyValue[1].replace("\"", "")), - (BiConsumer<Map<String, String>, Map<String, String>>) (stringStringHashMap, stringStringHashMap2) -> stringStringHashMap.putAll(stringStringHashMap2)); - - } - int value; - try { - value = Integer.valueOf(line.split(" ")[1]); - } catch (NumberFormatException e) { - value = 0; - } - metric.add(new MetricValue(value, labels)); - } else { - throw new IllegalStateException("Cannot parse line: " + line); - } - } - } - - return metrics; + @Test + void perObjectMetricsWithNoConnectionShouldReturnNoValue() throws Exception { + Metrics metrics = metricsPerObject(); + METRICS.forEach( + name -> { + Metric metric = metrics.get(name); + assertThat(metric).isNotNull().has(noValue()); + }); } @Test - void aggregatedMetricsWithNoConnectionShouldReturnZero() throws Exception { - Metrics metrics = metrics(); - System.out.println(metrics); - } + void aggregatedMetricsWithPublishersAndConsumersShouldReturnCorrectCounts(TestInfo info) + throws Exception { + List<String> streams = + IntStream.range(0, 5).mapToObj(i -> TestUtils.streamName(info)).collect(toList()); + int producersCount = streams.size(); + int consumersCount = streams.size() * 2; + int messagesByProducer = 10_000; + int messageCount = producersCount * messagesByProducer; - static class MetricValue { + Environment env = Environment.builder().port(TestUtils.streamPort()).build(); + List<Producer> producers = Collections.emptyList(); + List<Consumer> consumers = Collections.emptyList(); + CallableSupplier<Metrics> metricsCall = () -> metrics(); + try { + streams.forEach(stream -> env.streamCreator().stream(stream).create()); - private final int value; - private final Map<String, String> labels; + producers = + IntStream.range(0, producersCount) + .mapToObj(i -> env.producerBuilder().stream(streams.get(i % streams.size())).build()) + .collect(toList()); - MetricValue(int value, Map<String, String> labels) { - this.value = value; - this.labels = labels == null ? Collections.emptyMap() : labels; - } + waitUntil(() -> metricsCall.get().get(METRIC_PUBLISHERS).value() == producersCount); - @Override - public String toString() { - return "MetricValue{" + - "value=" + value + - ", labels=" + labels + - '}'; - } - } + CountDownLatch confirmedLatch = new CountDownLatch(messageCount); + ConfirmationHandler confirmationHandler = status -> confirmedLatch.countDown(); + producers.forEach( + producer -> { + IntStream.range(0, messagesByProducer) + .forEach( + i -> + producer.send( + producer.messageBuilder().addData("".getBytes()).build(), + confirmationHandler)); + }); - static class Metric { + assertThat(confirmedLatch.await(10, TimeUnit.SECONDS)).isTrue(); - private final String name; - private final String type; - private final String help; - private final List<MetricValue> values = new ArrayList<>(); + waitUntil(() -> metricsCall.get().get(METRIC_PUBLISHERS_CONFIRMED).value() == messageCount); - Metric(String name, String type, String help) { - this.name = name; - this.type = type; - this.help = help; - } + Metrics metrics = metricsCall.get(); + assertThat(metrics.get(METRIC_PUBLISHERS_PUBLISHED)).has(value(messageCount)); + assertThat(metrics.get(METRIC_PUBLISHERS_CONFIRMED)).has(value(messageCount)); + assertThat(metrics.get(METRIC_PUBLISHERS_ERRORED)).is(zero()); + assertThat(metrics.get(METRIC_CONSUMERS)).is(zero()); + assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED)).is(zero()); - void add(MetricValue value) { - values.add(value); - } + int consumedMessageCount = consumersCount * messagesByProducer; + CountDownLatch consumedLatch = new CountDownLatch(consumedMessageCount); + consumers = + IntStream.range(0, consumersCount) + .mapToObj( + i -> + env.consumerBuilder().stream(streams.get(i % streams.size())) + .messageHandler((ctx, msg) -> consumedLatch.countDown()) + .build()) + .collect(toList()); - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Metric metric = (Metric) o; - return name.equals(metric.name); - } + assertThat(consumedLatch.await(10, TimeUnit.SECONDS)).isTrue(); - @Override - public int hashCode() { - return Objects.hash(name); - } + waitUntil( + () -> metricsCall.get().get(METRIC_CONSUMERS_CONSUMED).value() == consumedMessageCount); + + metrics = metricsCall.get(); + assertThat(metrics.get(METRIC_CONSUMERS)).has(value(consumersCount)); + assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED)).has(value(consumedMessageCount)); - @Override - public String toString() { - return "Metric{" + - "name='" + name + '\'' + - ", type='" + type + '\'' + - ", help='" + help + '\'' + - ", values=" + values + - '}'; + } finally { + producers.forEach(producer -> producer.close()); + consumers.forEach(consumer -> consumer.close()); + streams.forEach(stream -> env.deleteStream(stream)); + env.close(); } } - static class Metrics { + @Test + void perObjectMetricsWithPublishersAndConsumersShouldReturnCorrectCounts(TestInfo info) + throws Exception { + List<String> streams = + IntStream.range(0, 5).mapToObj(i -> TestUtils.streamName(info)).collect(toList()); + int producersCount = streams.size(); + int consumersCount = streams.size() * 2; + int messagesByProducer = 10_000; + int messageCount = producersCount * messagesByProducer; - private final Map<String, Metric> metrics = new HashMap<>(); + Environment env = Environment.builder().port(TestUtils.streamPort()).build(); + List<Producer> producers = Collections.emptyList(); + List<Consumer> consumers = Collections.emptyList(); + CallableSupplier<Metrics> metricsCall = () -> metricsPerObject(); + try { + streams.forEach(stream -> env.streamCreator().stream(stream).create()); - void add(Metric metric) { - this.metrics.put(metric.name, metric); - } + producers = + IntStream.range(0, producersCount) + .mapToObj(i -> env.producerBuilder().stream(streams.get(i % streams.size())).build()) + .collect(toList()); - Metric get(String name) { - return metrics.get(name); - } + CountDownLatch confirmedLatch = new CountDownLatch(messageCount); + ConfirmationHandler confirmationHandler = status -> confirmedLatch.countDown(); + producers.forEach( + producer -> { + IntStream.range(0, messagesByProducer) + .forEach( + i -> + producer.send( + producer.messageBuilder().addData("".getBytes()).build(), + confirmationHandler)); + }); + + assertThat(confirmedLatch.await(10, TimeUnit.SECONDS)).isTrue(); + + waitUntil( + () -> + metricsCall.get().get(METRIC_PUBLISHERS_CONFIRMED).values().stream() + .mapToInt(MetricValue::value) + .sum() + == messageCount); + + Metrics metrics = metricsCall.get(); + assertThat(metrics.get(METRIC_PUBLISHERS)).has(noValue()); + assertThat(metrics.get(METRIC_PUBLISHERS_PUBLISHED)) + .has(valueCount(producersCount)) + .has(valuesWithLabels("vhost", "queue", "connection", "id")) + .has( + allOf( + streams.stream() + .map(s -> value("queue", s, messagesByProducer)) + .collect(toList()))); + assertThat(metrics.get(METRIC_PUBLISHERS_CONFIRMED)) + .has(valueCount(producersCount)) + .has(valuesWithLabels("vhost", "queue", "connection", "id")) + .has( + allOf( + streams.stream() + .map(s -> value("queue", s, messagesByProducer)) + .collect(toList()))); + assertThat(metrics.get(METRIC_PUBLISHERS_ERRORED)) + .has(valueCount(producersCount)) + .has(valuesWithLabels("vhost", "queue", "connection", "id")) + .has(allOf(streams.stream().map(s -> value("queue", s, 0)).collect(toList()))); + assertThat(metrics.get(METRIC_CONSUMERS)).has(noValue()); + assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED)).has(noValue()); + + int consumedMessageCount = consumersCount * messagesByProducer; + CountDownLatch consumedLatch = new CountDownLatch(consumedMessageCount); + consumers = + IntStream.range(0, consumersCount) + .mapToObj( + i -> + env.consumerBuilder().stream(streams.get(i % streams.size())) + .messageHandler((ctx, msg) -> consumedLatch.countDown()) + .build()) + .collect(toList()); + + assertThat(consumedLatch.await(10, TimeUnit.SECONDS)).isTrue(); - @Override - public String toString() { - return "Metrics{" + - "metrics=" + metrics + - '}'; + waitUntil( + () -> + metricsCall.get().get(METRIC_CONSUMERS_CONSUMED).values().stream() + .mapToInt(MetricValue::value) + .sum() + == consumedMessageCount); + + metrics = metricsCall.get(); + assertThat(metrics.get(METRIC_CONSUMERS)).has(noValue()); + assertThat(metrics.get(METRIC_CONSUMERS_CONSUMED)) + .has(valueCount(consumersCount)) + .has(valuesWithLabels("vhost", "queue", "connection", "id")) + .has( + allOf( + streams.stream() + .flatMap(s -> Stream.of(s, s)) + .map(s -> value("queue", s, messagesByProducer)) + .collect(toList()))); + + } finally { + producers.forEach(producer -> producer.close()); + consumers.forEach(consumer -> consumer.close()); + streams.forEach(stream -> env.deleteStream(stream)); + env.close(); } } + + static Metrics metrics() throws IOException { + return parseMetrics(get("")); + } + + static Metrics metricsPerObject() throws IOException { + return parseMetrics(get("/per-object")); + } } diff --git a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 08f005f815..b911dd8ddd 100644 --- a/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream_prometheus/test/prometheus_http_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -11,32 +11,21 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. // package com.rabbitmq.stream; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; -import com.rabbitmq.stream.impl.Client; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import java.lang.reflect.Field; +import com.rabbitmq.stream.MetricsUtils.Metric; import java.lang.reflect.Method; import java.time.Duration; -import java.util.Set; +import java.util.Arrays; +import java.util.Collection; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import okhttp3.Authenticator; -import okhttp3.Credentials; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.extension.AfterAllCallback; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeAllCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; public class TestUtils { @@ -76,104 +65,15 @@ public class TestUtils { boolean getAsBoolean() throws Exception; } - static class StreamTestInfrastructureExtension - implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { - - private static final ExtensionContext.Namespace NAMESPACE = - ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class); - - private static ExtensionContext.Store store(ExtensionContext extensionContext) { - return extensionContext.getRoot().getStore(NAMESPACE); - } - - private static EventLoopGroup eventLoopGroup(ExtensionContext context) { - return (EventLoopGroup) store(context).get("nettyEventLoopGroup"); - } - - @Override - public void beforeAll(ExtensionContext context) { - store(context).put("nettyEventLoopGroup", new NioEventLoopGroup()); - } - - @Override - public void beforeEach(ExtensionContext context) throws Exception { - try { - Field streamField = - context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup"); - streamField.setAccessible(true); - streamField.set(context.getTestInstance().get(), eventLoopGroup(context)); - } catch (NoSuchFieldException e) { - - } - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); - streamField.setAccessible(true); - String stream = streamName(context); - streamField.set(context.getTestInstance().get(), stream); - Client client = - new Client( - new Client.ClientParameters() - .eventLoopGroup(eventLoopGroup(context)) - .port(streamPort())); - Client.Response response = client.create(stream); - assertThat(response.isOk()).isTrue(); - client.close(); - store(context).put("testMethodStream", stream); - } catch (NoSuchFieldException e) { - - } - - for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) { - if (declaredField.getType().equals(ClientFactory.class)) { - declaredField.setAccessible(true); - ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context)); - declaredField.set(context.getTestInstance().get(), clientFactory); - store(context).put("testClientFactory", clientFactory); - break; - } - } - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); - streamField.setAccessible(true); - String stream = (String) streamField.get(context.getTestInstance().get()); - Client client = - new Client( - new Client.ClientParameters() - .eventLoopGroup(eventLoopGroup(context)) - .port(streamPort())); - Client.Response response = client.delete(stream); - assertThat(response.isOk()).isTrue(); - client.close(); - store(context).remove("testMethodStream"); - } catch (NoSuchFieldException e) { - - } - - ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory"); - if (clientFactory != null) { - clientFactory.close(); - } - } - - @Override - public void afterAll(ExtensionContext context) throws Exception { - EventLoopGroup eventLoopGroup = eventLoopGroup(context); - eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); - } + @FunctionalInterface + interface CallableSupplier<T> { + T get() throws Exception; } static String streamName(TestInfo info) { return streamName(info.getTestClass().get(), info.getTestMethod().get()); } - private static String streamName(ExtensionContext context) { - return streamName(context.getTestInstance().get().getClass(), context.getTestMethod().get()); - } - private static String streamName(Class<?> testClass, Method testMethod) { String uuid = UUID.randomUUID().toString(); return String.format( @@ -181,29 +81,58 @@ public class TestUtils { testClass.getSimpleName(), testMethod.getName(), uuid.substring(uuid.length() / 2)); } - static class ClientFactory { + static Condition<Metric> gauge() { + return new Condition<>(m -> m.isGauge(), "should be a gauge"); + } - private final EventLoopGroup eventLoopGroup; - private final Set<Client> clients = ConcurrentHashMap.newKeySet(); + static Condition<Metric> counter() { + return new Condition<>(m -> m.isCounter(), "should be a counter"); + } - public ClientFactory(EventLoopGroup eventLoopGroup) { - this.eventLoopGroup = eventLoopGroup; - } + static Condition<Metric> help() { + return new Condition<>(m -> m.help != null, "should have a help description"); + } - public Client get() { - return get(new Client.ClientParameters()); - } + static Condition<Metric> zero() { + return new Condition<>( + m -> m.values.size() == 1 && m.values.get(0).value == 0, "should have one metric at 0"); + } - public Client get(Client.ClientParameters parameters) { - Client client = new Client(parameters.eventLoopGroup(eventLoopGroup).port(streamPort())); - clients.add(client); - return client; - } + static Condition<Metric> noValue() { + return new Condition<>(m -> m.values.isEmpty(), "should have no value"); + } - private void close() { - for (Client c : clients) { - c.close(); - } - } + static Condition<Metric> value(int expected) { + return new Condition<>(m -> m.value() == expected, "should have value " + expected); + } + + static Condition<Metric> valueCount(int expected) { + return new Condition<>(m -> m.values.size() == expected, "should have " + expected + " values"); + } + + static Condition<Metric> valuesWithLabels(String... expectedLabels) { + Collection<String> expected = Arrays.asList(expectedLabels); + return new Condition<>( + m -> + m.values().stream() + .map(v -> v.labels.keySet()) + .map(labels -> labels.containsAll(expected)) + .reduce(true, (b1, b2) -> b1 && b2), + "should have values with labels " + String.join(",", expected)); + } + + static Condition<Metric> value(String labelKey, String labelValue, int value) { + return new Condition<>( + m -> + m.values().stream() + .filter(v -> v.labels.containsKey(labelKey)) + .filter(v -> v.labels.get(labelKey).equals(labelValue)) + .filter(v -> v.value() == value) + .count() + >= 1, + "should have value with %s=%s %d", + labelKey, + labelValue, + value); } } |