summaryrefslogtreecommitdiff
path: root/oslo_messaging/rpc/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/rpc/client.py')
-rw-r--r--oslo_messaging/rpc/client.py53
1 files changed, 36 insertions, 17 deletions
diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py
index 115198b..cbec525 100644
--- a/oslo_messaging/rpc/client.py
+++ b/oslo_messaging/rpc/client.py
@@ -21,6 +21,7 @@ import logging
from oslo_config import cfg
from oslo_messaging._drivers import base as driver_base
+from oslo_messaging import _metrics as metrics
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
from oslo_messaging import serializer as msg_serializer
@@ -146,12 +147,23 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta):
self._check_version_cap(msg.get('version'))
- try:
- self.transport._send(self.target, msg_ctxt, msg,
- retry=self.retry,
- transport_options=self.transport_options)
- except driver_base.TransportDriverError as ex:
- raise ClientSendError(self.target, ex)
+ with metrics.get_collector(self.conf, "rpc_client",
+ target=self.target,
+ method=method,
+ call_type="cast") as metrics_collector:
+ try:
+ self.transport._send(self.target, msg_ctxt, msg,
+ retry=self.retry,
+ transport_options=self.transport_options)
+ except driver_base.TransportDriverError as ex:
+ self._metrics_api.rpc_client_exception_total(
+ self.target, method, "cast", ex.__class__.__name__)
+ raise ClientSendError(self.target, ex)
+ except Exception as ex:
+ if self.conf.oslo_messaging_metrics.metrics_enabled:
+ metrics_collector.rpc_client_exception_total(
+ self.target, method, "cast", ex.__class__.__name__)
+ raise
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
@@ -170,17 +182,24 @@ class _BaseCallContext(object, metaclass=abc.ABCMeta):
self._check_version_cap(msg.get('version'))
- try:
- result = \
- self.transport._send(self.target, msg_ctxt, msg,
- wait_for_reply=True, timeout=timeout,
- call_monitor_timeout=cm_timeout,
- retry=self.retry,
- transport_options=self.transport_options)
- except driver_base.TransportDriverError as ex:
- raise ClientSendError(self.target, ex)
-
- return self.serializer.deserialize_entity(ctxt, result)
+ with metrics.get_collector(self.conf, "rpc_client",
+ target=self.target, method=method,
+ call_type="call") as metrics_collector:
+ try:
+ result = self.transport._send(
+ self.target, msg_ctxt, msg, wait_for_reply=True,
+ timeout=timeout, call_monitor_timeout=cm_timeout,
+ retry=self.retry, transport_options=self.transport_options)
+ except driver_base.TransportDriverError as ex:
+ self._metrics_api.rpc_client_exception_total(
+ self.target, method, "call", ex.__class__.__name__)
+ raise ClientSendError(self.target, ex)
+ except Exception as ex:
+ if self.conf.oslo_messaging_metrics.metrics_enabled:
+ metrics_collector.rpc_client_exception_total(
+ self.target, method, "call", ex.__class__.__name__)
+ raise
+ return self.serializer.deserialize_entity(ctxt, result)
@abc.abstractmethod
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,