summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-06-10 00:37:43 +0000
committerGerrit Code Review <review@openstack.org>2021-06-10 00:37:43 +0000
commitf1b47b6ba8ee4888f6cba1efac0fbf747c5406e6 (patch)
treee3b37fb8c6bf9f9e3880c5c65605531785ceffc7
parent15ad4c2b7a19bd334a79aab6f2bb0e0be29bf371 (diff)
parentbdbb6d62ee20bfd5ffc59f8772a5a0e60614ba90 (diff)
downloadoslo-messaging-f1b47b6ba8ee4888f6cba1efac0fbf747c5406e6.tar.gz
Merge "Add Support For oslo.metrics"
-rw-r--r--oslo_messaging/_metrics/__init__.py19
-rw-r--r--oslo_messaging/_metrics/client.py256
-rw-r--r--oslo_messaging/conffixture.py4
-rw-r--r--oslo_messaging/rpc/client.py53
-rw-r--r--oslo_messaging/tests/functional/test_functional.py34
-rw-r--r--releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml8
-rw-r--r--requirements.txt3
7 files changed, 360 insertions, 17 deletions
diff --git a/oslo_messaging/_metrics/__init__.py b/oslo_messaging/_metrics/__init__.py
new file mode 100644
index 0000000..d624714
--- /dev/null
+++ b/oslo_messaging/_metrics/__init__.py
@@ -0,0 +1,19 @@
+# Copyright 2020 LINE Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License'); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+__all__ = [
+ 'MetricsCollectorClient',
+ 'get_collector',
+]
+
+from .client import *
diff --git a/oslo_messaging/_metrics/client.py b/oslo_messaging/_metrics/client.py
new file mode 100644
index 0000000..46916a1
--- /dev/null
+++ b/oslo_messaging/_metrics/client.py
@@ -0,0 +1,256 @@
+
+# Copyright 2020 LINE Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import queue
+import socket
+import threading
+import time
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_metrics import message_type
+from oslo_utils import eventletutils
+from oslo_utils import importutils
+
+
+LOG = logging.getLogger(__name__)
+
+eventlet = importutils.try_import('eventlet')
+if eventlet and eventletutils.is_monkey_patched("thread"):
+ # Here we initialize module with the native python threading module
+ # if it was already monkey patched by eventlet/greenlet.
+ stdlib_threading = eventlet.patcher.original('threading')
+else:
+ # Manage the case where we run this driver in a non patched environment
+ # and where user even so configure the driver to run heartbeat through
+ # a python thread, if we don't do that when the heartbeat will start
+ # we will facing an issue by trying to override the threading module.
+ stdlib_threading = threading
+
+oslo_messaging_metrics = [
+ cfg.BoolOpt('metrics_enabled', default=False,
+ help='Boolean to send rpc metrics to oslo.metrics.'),
+ cfg.IntOpt('metrics_buffer_size', default=1000,
+ help='Buffer size to store in oslo.messaging.'),
+ cfg.StrOpt('metrics_socket_file',
+ default='/var/tmp/metrics_collector.sock',
+ help='Unix domain socket file to be used'
+ ' to send rpc related metrics'),
+ cfg.StrOpt('metrics_process_name',
+ default='',
+ help='Process name which is used to identify which process'
+ ' produce metrics'),
+ cfg.IntOpt('metrics_thread_stop_timeout',
+ default=10,
+ help='Sending thread stop once metrics_thread_stop_timeout'
+ ' seconds after the last successful metrics send.'
+ ' So that this thread will not be the blocker'
+ ' when process is shutting down.'
+ ' If the process is still running, sending thread will'
+ ' be restarted at the next metrics queueing time')
+]
+cfg.CONF.register_opts(oslo_messaging_metrics, group='oslo_messaging_metrics')
+
+
+class MetricsCollectorClient(object):
+
+ def __init__(self, conf, metrics_type, **kwargs):
+ self.conf = conf.oslo_messaging_metrics
+ self.unix_socket = self.conf.metrics_socket_file
+ buffer_size = self.conf.metrics_buffer_size
+ self.tx_queue = queue.Queue(buffer_size)
+ self.next_send_metric = None
+ self.metrics_type = metrics_type
+ self.args = kwargs
+ self.send_thread = threading.Thread(target=self.send_loop)
+ self.send_thread.start()
+
+ def __enter__(self):
+ if not self.conf.metrics_enabled:
+ return None
+ self.start_time = time.time()
+ send_method = getattr(self, self.metrics_type +
+ "_invocation_start_total")
+ send_method(**self.args)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if self.conf.metrics_enabled:
+ duration = time.time() - self.start_time
+ send_method = getattr(
+ self, self.metrics_type + "_processing_seconds")
+ send_method(duration=duration, **self.args)
+ send_method = getattr(
+ self, self.metrics_type + "_invocation_end_total")
+ send_method(**self.args)
+
+ def put_into_txqueue(self, metrics_name, action, **labels):
+
+ labels['process'] = \
+ self.conf.metrics_process_name
+ m = message_type.Metric("oslo_messaging", metrics_name, action,
+ **labels)
+
+ try:
+ self.tx_queue.put_nowait(m)
+ except queue.Full:
+ LOG.warning("tx queues is already full(%s/%s). Fails to "
+ "send the metrics(%s)" %
+ (self.tx_queue.qsize(), self.tx_queue.maxsize, m))
+
+ if not self.send_thread.is_alive():
+ self.send_thread = threading.Thread(target=self.send_loop)
+ self.send_thread.start()
+
+ def send_loop(self):
+ timeout = self.conf.metrics_thread_stop_timeout
+ stoptime = time.time() + timeout
+ while stoptime > time.time():
+ if self.next_send_metric is None:
+ try:
+ self.next_send_metric = self.tx_queue.get(timeout=timeout)
+ except queue.Empty:
+ continue
+ try:
+ self.send_metric(self.next_send_metric)
+ self.next_send_metric = None
+ stoptime = time.time() + timeout
+ except Exception as e:
+ LOG.error("Failed to send metrics: %s. "
+ "Wait 1 seconds for next try." % e)
+ time.sleep(1)
+
+ def send_metric(self, metric):
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ s.connect(self.unix_socket)
+ s.send(metric.to_json().encode())
+ s.close()
+
+ def put_rpc_client_metrics_to_txqueue(self, metric_name, action,
+ target, method, call_type, timeout,
+ exception=None):
+ kwargs = {
+ 'call_type': call_type,
+ 'exchange': target.exchange,
+ 'topic': target.topic,
+ 'namespace': target.namespace,
+ 'version': target.version,
+ 'server': target.server,
+ 'fanout': target.fanout,
+ 'method': method,
+ 'timeout': timeout,
+ }
+ if exception:
+ kwargs['exception'] = exception
+
+ self.put_into_txqueue(metric_name, action, **kwargs)
+
+ def rpc_client_invocation_start_total(self, target, method, call_type,
+ timeout=None):
+ self.put_rpc_client_metrics_to_txqueue(
+ "rpc_client_invocation_start_total",
+ message_type.MetricAction("inc", None),
+ target, method, call_type, timeout
+ )
+
+ def rpc_client_invocation_end_total(self, target, method, call_type,
+ timeout=None):
+ self.put_rpc_client_metrics_to_txqueue(
+ "rpc_client_invocation_end_total",
+ message_type.MetricAction("inc", None),
+ target, method, call_type, timeout
+ )
+
+ def rpc_client_processing_seconds(self, target, method, call_type,
+ duration, timeout=None):
+ self.put_rpc_client_metrics_to_txqueue(
+ "rpc_client_processing_seconds",
+ message_type.MetricAction("observe", duration),
+ target, method, call_type, timeout
+ )
+
+ def rpc_client_exception_total(self, target, method, call_type, exception,
+ timeout=None):
+ self.put_rpc_client_metrics_to_txqueue(
+ "rpc_client_exception_total",
+ message_type.MetricAction("inc", None),
+ target, method, call_type, timeout, exception
+ )
+
+ def put_rpc_server_metrics_to_txqueue(self, metric_name, action,
+ target, endpoint, ns, ver, method,
+ exception=None):
+ kwargs = {
+ 'endpoint': endpoint,
+ 'namespace': ns,
+ 'version': ver,
+ 'method': method,
+ 'exchange': None,
+ 'topic': None,
+ 'server': None
+ }
+ if target:
+ kwargs['exchange'] = target.exchange
+ kwargs['topic'] = target.topic
+ kwargs['server'] = target.server
+ if exception:
+ kwargs['exception'] = exception
+
+ self.put_into_txqueue(metric_name, action, **kwargs)
+
+ def rpc_server_invocation_start_total(self, target, endpoint,
+ ns, ver, method):
+ self.put_rpc_server_metrics_to_txqueue(
+ "rpc_server_invocation_start_total",
+ message_type.MetricAction("inc", None),
+ target, endpoint, ns, ver, method
+ )
+
+ def rpc_server_invocation_end_total(self, target, endpoint,
+ ns, ver, method):
+ self.put_rpc_server_metrics_to_txqueue(
+ "rpc_server_invocation_end_total",
+ message_type.MetricAction("inc", None),
+ target, endpoint, ns, ver, method
+ )
+
+ def rpc_server_processing_seconds(self, target, endpoint, ns, ver,
+ method, duration):
+ self.put_rpc_server_metrics_to_txqueue(
+ "rpc_server_processing_seconds",
+ message_type.MetricAction("observe", duration),
+ target, endpoint, ns, ver, method
+ )
+
+ def rpc_server_exception_total(self, target, endpoint, ns, ver,
+ method, exception):
+ self.put_rpc_server_metrics_to_txqueue(
+ "rpc_server_exception_total",
+ message_type.MetricAction("inc", None),
+ target, endpoint, ns, ver, method, exception=exception
+ )
+
+
+METRICS_COLLECTOR = None
+
+
+def get_collector(conf, metrics_type, **kwargs):
+ global threading
+ threading = stdlib_threading
+ global METRICS_COLLECTOR
+ if METRICS_COLLECTOR is None:
+ METRICS_COLLECTOR = MetricsCollectorClient(
+ conf, metrics_type, **kwargs)
+ return METRICS_COLLECTOR
diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py
index 39e7615..2656ebf 100644
--- a/oslo_messaging/conffixture.py
+++ b/oslo_messaging/conffixture.py
@@ -67,6 +67,10 @@ class ConfFixture(fixtures.Fixture):
'oslo_messaging.notify.notifier',
'_notifier_opts',
'oslo_messaging_notifications')
+ _import_opts(self.conf,
+ 'oslo_messaging._metrics.client',
+ 'oslo_messaging_metrics',
+ 'oslo_messaging_metrics')
if transport_url is not None:
self.transport_url = transport_url
diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py
index 115198b..cbec525 100644
--- a/oslo_messaging/rpc/client.py
+++ b/oslo_messaging/rpc/client.py
@@ -21,6 +21,7 @@ import logging
from oslo_config import cfg
from oslo_messaging._drivers import base as driver_base
+from oslo_messaging import _metrics as metrics
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
from oslo_messaging import serializer as msg_serializer
@@ -146,12 +147,23 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta):
self._check_version_cap(msg.get('version'))
- try:
- self.transport._send(self.target, msg_ctxt, msg,
- retry=self.retry,
- transport_options=self.transport_options)
- except driver_base.TransportDriverError as ex:
- raise ClientSendError(self.target, ex)
+ with metrics.get_collector(self.conf, "rpc_client",
+ target=self.target,
+ method=method,
+ call_type="cast") as metrics_collector:
+ try:
+ self.transport._send(self.target, msg_ctxt, msg,
+ retry=self.retry,
+ transport_options=self.transport_options)
+ except driver_base.TransportDriverError as ex:
+ self._metrics_api.rpc_client_exception_total(
+ self.target, method, "cast", ex.__class__.__name__)
+ raise ClientSendError(self.target, ex)
+ except Exception as ex:
+ if self.conf.oslo_messaging_metrics.metrics_enabled:
+ metrics_collector.rpc_client_exception_total(
+ self.target, method, "cast", ex.__class__.__name__)
+ raise
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
@@ -170,17 +182,24 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta):
self._check_version_cap(msg.get('version'))
- try:
- result = \
- self.transport._send(self.target, msg_ctxt, msg,
- wait_for_reply=True, timeout=timeout,
- call_monitor_timeout=cm_timeout,
- retry=self.retry,
- transport_options=self.transport_options)
- except driver_base.TransportDriverError as ex:
- raise ClientSendError(self.target, ex)
-
- return self.serializer.deserialize_entity(ctxt, result)
+ with metrics.get_collector(self.conf, "rpc_client",
+ target=self.target, method=method,
+ call_type="call") as metrics_collector:
+ try:
+ result = self.transport._send(
+ self.target, msg_ctxt, msg, wait_for_reply=True,
+ timeout=timeout, call_monitor_timeout=cm_timeout,
+ retry=self.retry, transport_options=self.transport_options)
+ except driver_base.TransportDriverError as ex:
+ self._metrics_api.rpc_client_exception_total(
+ self.target, method, "call", ex.__class__.__name__)
+ raise ClientSendError(self.target, ex)
+ except Exception as ex:
+ if self.conf.oslo_messaging_metrics.metrics_enabled:
+ metrics_collector.rpc_client_exception_total(
+ self.target, method, "call", ex.__class__.__name__)
+ raise
+ return self.serializer.deserialize_entity(ctxt, result)
@abc.abstractmethod
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py
index a71f6c2..3c503ff 100644
--- a/oslo_messaging/tests/functional/test_functional.py
+++ b/oslo_messaging/tests/functional/test_functional.py
@@ -12,6 +12,8 @@
# under the License.
import os
+import requests
+import subprocess
import time
import uuid
@@ -565,3 +567,35 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual('test', event[1])
self.assertEqual('Hello World!', event[2])
self.assertEqual('abc', event[3])
+
+
+class MetricsTestCase(utils.SkipIfNoTransportURL):
+
+ def setUp(self):
+ super(MetricsTestCase, self).setUp(conf=cfg.ConfigOpts())
+ if self.rpc_url.startswith("kafka://"):
+ self.skipTest("kafka does not support RPC API")
+
+ self.config(metrics_enabled=True,
+ group='oslo_messaging_metrics')
+
+ def test_functional(self):
+ # verify call metrics is sent and reflected in oslo.metrics
+ self.config(metrics_socket_file='/var/tmp/metrics_collector.sock',
+ group='oslo_messaging_metrics')
+ metric_server = subprocess.Popen(["python3", "-m", "oslo_metrics"])
+ time.sleep(1)
+ group = self.useFixture(
+ utils.RpcServerGroupFixture(self.conf, self.rpc_url))
+ client = group.client(1)
+ client.add(increment=1)
+ time.sleep(1)
+ r = requests.get('http://localhost:3000')
+ for line in r.text.split('\n'):
+ if 'client_invocation_start_total{' in line:
+ self.assertEqual('1.0', line[-3:])
+ elif 'client_invocation_end_total{' in line:
+ self.assertEqual('1.0', line[-3:])
+ elif 'client_processing_seconds_count{' in line:
+ self.assertEqual('1.0', line[-3:])
+ metric_server.terminate()
diff --git a/releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml b/releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml
new file mode 100644
index 0000000..541c987
--- /dev/null
+++ b/releasenotes/notes/oslo-metrics-support-fe16343a637cc14b.yaml
@@ -0,0 +1,8 @@
+---
+features:
+ - |
+ | Introduce support for sending rpc client metrics to oslo.metrics.
+ | This feature can be enabled by setting a configuration parameter:
+
+ [oslo_messaging_metrics]
+ metrics_enabled = True # default is false
diff --git a/requirements.txt b/requirements.txt
index 6cc4336..e027188 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -28,3 +28,6 @@ kombu>=4.6.6 # BSD
# middleware
oslo.middleware>=3.31.0 # Apache-2.0
+
+# metrics
+oslo.metrics>=0.2.1 # Apache-2.0