summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-11-07 19:44:53 +0000
committerGerrit Code Review <review@openstack.org>2016-11-07 19:44:53 +0000
commitfc837718d1d21971e4ed3de3d9543a8af8bb9cbf (patch)
treefafd77d35b76ac91f210f7d0aacf480e07abbe2c
parent1c6f8b60b279b053f75c63f1a66158108cdf4456 (diff)
parent6ce95900e8ef3642d7423b94a729503cc8a62c97 (diff)
downloadosprofiler-fc837718d1d21971e4ed3de3d9543a8af8bb9cbf.tar.gz
Merge "Add Log Insight driver"
-rw-r--r--osprofiler/drivers/__init__.py1
-rw-r--r--osprofiler/drivers/loginsight.py263
-rw-r--r--osprofiler/exc.py8
-rw-r--r--osprofiler/tests/drivers/test_loginsight.py296
-rw-r--r--requirements.txt3
-rw-r--r--test-requirements.txt1
6 files changed, 572 insertions, 0 deletions
diff --git a/osprofiler/drivers/__init__.py b/osprofiler/drivers/__init__.py
index f208582..bb287da 100644
--- a/osprofiler/drivers/__init__.py
+++ b/osprofiler/drivers/__init__.py
@@ -1,6 +1,7 @@
from osprofiler.drivers import base # noqa
from osprofiler.drivers import ceilometer # noqa
from osprofiler.drivers import elasticsearch_driver # noqa
+from osprofiler.drivers import loginsight # noqa
from osprofiler.drivers import messaging # noqa
from osprofiler.drivers import mongodb # noqa
from osprofiler.drivers import redis_driver # noqa
diff --git a/osprofiler/drivers/loginsight.py b/osprofiler/drivers/loginsight.py
new file mode 100644
index 0000000..4e875ae
--- /dev/null
+++ b/osprofiler/drivers/loginsight.py
@@ -0,0 +1,263 @@
+# Copyright (c) 2016 VMware, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Classes to use VMware vRealize Log Insight as the trace data store.
+"""
+
+import json
+import logging as log
+
+import netaddr
+from oslo_concurrency.lockutils import synchronized
+import requests
+import six.moves.urllib.parse as urlparse
+
+from osprofiler.drivers import base
+from osprofiler import exc
+
+LOG = log.getLogger(__name__)
+
+
+class LogInsightDriver(base.Driver):
+ """Driver for storing trace data in VMware vRealize Log Insight.
+
+ The driver uses Log Insight ingest service to store trace data and uses
+ the query service to retrieve it. The minimum required Log Insight version
+ is 3.3.
+
+ The connection string to initialize the driver should be of the format:
+ loginsight://<username>:<password>@<loginsight-host>
+
+ If the username or password contains the character ':' or '@', it must be
+ escaped using URL encoding. For example, the connection string to connect
+ to Log Insight server at 10.1.2.3 using username "osprofiler" and password
+ "p@ssword" is:
+ loginsight://osprofiler:p%40ssword@10.1.2.3
+ """
+ def __init__(
+ self, connection_str, project=None, service=None, host=None,
+ **kwargs):
+ super(LogInsightDriver, self).__init__(connection_str,
+ project=project,
+ service=service,
+ host=host)
+
+ parsed_connection = urlparse.urlparse(connection_str)
+ try:
+ creds, host = parsed_connection.netloc.split("@")
+ username, password = creds.split(":")
+ except ValueError:
+ raise ValueError("Connection string format is: loginsight://"
+ "<username>:<password>@<loginsight-host>. If the "
+ "username or password contains the character '@' "
+ "or ':', it must be escaped using URL encoding.")
+
+ username = urlparse.unquote(username)
+ password = urlparse.unquote(password)
+ self._client = LogInsightClient(host, username, password)
+
+ self._client.login()
+
+ @classmethod
+ def get_name(cls):
+ return "loginsight"
+
+ def notify(self, info):
+ """Send trace to Log Insight server."""
+
+ trace = info.copy()
+ trace["project"] = self.project
+ trace["service"] = self.service
+
+ event = {"text": "OSProfiler trace"}
+
+ def _create_field(name, content):
+ return {"name": name, "content": content}
+
+ event["fields"] = [_create_field("base_id", trace["base_id"]),
+ _create_field("trace_id", trace["trace_id"]),
+ _create_field("project", trace["project"]),
+ _create_field("service", trace["service"]),
+ _create_field("name", trace["name"]),
+ _create_field("trace", json.dumps(trace))]
+
+ self._client.send_event(event)
+
+ def get_report(self, base_id):
+ """Retrieves and parses trace data from Log Insight.
+
+ :param base_id: Trace base ID
+ """
+ response = self._client.query_events({"base_id": base_id})
+
+ if "events" in response:
+ for event in response["events"]:
+ if "fields" not in event:
+ continue
+
+ for field in event["fields"]:
+ if field["name"] == "trace":
+ trace = json.loads(field["content"])
+ trace_id = trace["trace_id"]
+ parent_id = trace["parent_id"]
+ name = trace["name"]
+ project = trace["project"]
+ service = trace["service"]
+ host = trace["info"]["host"]
+ timestamp = trace["timestamp"]
+
+ self._append_results(
+ trace_id, parent_id, name, project, service, host,
+ timestamp, trace)
+ break
+
+ return self._parse_results()
+
+
+class LogInsightClient(object):
+ """A minimal Log Insight client."""
+
+ LI_OSPROFILER_AGENT_ID = "F52D775B-6017-4787-8C8A-F21AE0AEC057"
+
+ # API paths
+ SESSIONS_PATH = "api/v1/sessions"
+ CURRENT_SESSIONS_PATH = "api/v1/sessions/current"
+ EVENTS_INGEST_PATH = "api/v1/events/ingest/%s" % LI_OSPROFILER_AGENT_ID
+ QUERY_EVENTS_BASE_PATH = "api/v1/events"
+
+ def __init__(self, host, username, password, api_port=9000,
+ api_ssl_port=9543, query_timeout=60000):
+ self._host = host
+ self._username = username
+ self._password = password
+ self._api_port = api_port
+ self._api_ssl_port = api_ssl_port
+ self._query_timeout = query_timeout
+ self._session = requests.Session()
+ self._session_id = None
+
+ def _build_base_url(self, scheme):
+ proto_str = "%s://" % scheme
+ host_str = ("[%s]" % self._host if netaddr.valid_ipv6(self._host)
+ else self._host)
+ port_str = ":%d" % (self._api_ssl_port if scheme == "https"
+ else self._api_port)
+ return proto_str + host_str + port_str
+
+ def _check_response(self, resp):
+ if resp.status_code == 440:
+ raise exc.LogInsightLoginTimeout()
+
+ if not resp.ok:
+ msg = "n/a"
+ if resp.text:
+ try:
+ body = json.loads(resp.text)
+ msg = body.get("errorMessage", msg)
+ except ValueError:
+ pass
+ else:
+ msg = resp.reason
+ raise exc.LogInsightAPIError(msg)
+
+ def _send_request(
+ self, method, scheme, path, headers=None, body=None, params=None):
+ url = "%s/%s" % (self._build_base_url(scheme), path)
+
+ headers = headers or {}
+ headers["content-type"] = "application/json"
+ body = body or {}
+ params = params or {}
+
+ req = requests.Request(
+ method, url, headers=headers, data=json.dumps(body), params=params)
+ req = req.prepare()
+ resp = self._session.send(req, verify=False)
+
+ self._check_response(resp)
+ return resp.json()
+
+ def _get_auth_header(self):
+ return {"X-LI-Session-Id": self._session_id}
+
+ def _trunc_session_id(self):
+ if self._session_id:
+ return self._session_id[-5:]
+
+ def _is_current_session_active(self):
+ try:
+ self._send_request("get",
+ "https",
+ self.CURRENT_SESSIONS_PATH,
+ headers=self._get_auth_header())
+ LOG.debug("Current session %s is active.",
+ self._trunc_session_id())
+ return True
+ except (exc.LogInsightLoginTimeout, exc.LogInsightAPIError):
+ LOG.debug("Current session %s is not active.",
+ self._trunc_session_id())
+ return False
+
+ @synchronized("li_login_lock")
+ def login(self):
+ # Another thread might have created the session while the current
+ # thread was waiting for the lock.
+ if self._session_id and self._is_current_session_active():
+ return
+
+ LOG.info("Logging into Log Insight server: %s.", self._host)
+ resp = self._send_request("post",
+ "https",
+ self.SESSIONS_PATH,
+ body={"username": self._username,
+ "password": self._password})
+
+ self._session_id = resp["sessionId"]
+ LOG.debug("Established session %s.", self._trunc_session_id())
+
+ def send_event(self, event):
+ events = {"events": [event]}
+ self._send_request("post",
+ "http",
+ self.EVENTS_INGEST_PATH,
+ body=events)
+
+ def query_events(self, params):
+ # Assumes that the keys and values in the params are strings and
+ # the operator is "CONTAINS".
+ constraints = []
+ for field, value in params.items():
+ constraints.append("%s/CONTAINS+%s" % (field, value))
+ constraints.append("timestamp/GT+0")
+
+ path = "%s/%s" % (self.QUERY_EVENTS_BASE_PATH, "/".join(constraints))
+
+ def _query_events():
+ return self._send_request("get",
+ "https",
+ path,
+ headers=self._get_auth_header(),
+ params={"limit": 20000,
+ "timeout": self._query_timeout})
+ try:
+ resp = _query_events()
+ except exc.LogInsightLoginTimeout:
+ # Login again and re-try.
+ LOG.debug("Current session timed out.")
+ self.login()
+ resp = _query_events()
+
+ return resp
diff --git a/osprofiler/exc.py b/osprofiler/exc.py
index 0ffc9c9..0f2fa33 100644
--- a/osprofiler/exc.py
+++ b/osprofiler/exc.py
@@ -22,3 +22,11 @@ class CommandError(Exception):
def __str__(self):
return self.message or self.__class__.__doc__
+
+
+class LogInsightAPIError(Exception):
+ pass
+
+
+class LogInsightLoginTimeout(Exception):
+ pass
diff --git a/osprofiler/tests/drivers/test_loginsight.py b/osprofiler/tests/drivers/test_loginsight.py
new file mode 100644
index 0000000..5e28aee
--- /dev/null
+++ b/osprofiler/tests/drivers/test_loginsight.py
@@ -0,0 +1,296 @@
+# Copyright (c) 2016 VMware, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import ddt
+import mock
+
+from osprofiler.drivers import loginsight
+from osprofiler import exc
+from osprofiler.tests import test
+
+
+@ddt.ddt
+class LogInsightDriverTestCase(test.TestCase):
+
+ BASE_ID = "8d28af1e-acc0-498c-9890-6908e33eff5f"
+
+ def setUp(self):
+ super(LogInsightDriverTestCase, self).setUp()
+ self._client = mock.Mock(spec=loginsight.LogInsightClient)
+ self._project = "cinder"
+ self._service = "osapi_volume"
+ self._host = "ubuntu"
+ with mock.patch.object(loginsight, "LogInsightClient",
+ return_value=self._client):
+ self._driver = loginsight.LogInsightDriver(
+ "loginsight://username:password@host",
+ project=self._project,
+ service=self._service,
+ host=self._host)
+
+ @mock.patch.object(loginsight, "LogInsightClient")
+ def test_init(self, client_class):
+ client = mock.Mock()
+ client_class.return_value = client
+
+ loginsight.LogInsightDriver("loginsight://username:password@host")
+ client_class.assert_called_once_with("host", "username", "password")
+ client.login.assert_called_once_with()
+
+ @ddt.data("loginsight://username@host",
+ "loginsight://username:p@ssword@host",
+ "loginsight://us:rname:password@host")
+ def test_init_with_invalid_connection_string(self, conn_str):
+ self.assertRaises(ValueError, loginsight.LogInsightDriver, conn_str)
+
+ @mock.patch.object(loginsight, "LogInsightClient")
+ def test_init_with_special_chars_in_conn_str(self, client_class):
+ client = mock.Mock()
+ client_class.return_value = client
+
+ loginsight.LogInsightDriver("loginsight://username:p%40ssword@host")
+ client_class.assert_called_once_with("host", "username", "p@ssword")
+ client.login.assert_called_once_with()
+
+ def test_get_name(self):
+ self.assertEqual("loginsight", self._driver.get_name())
+
+ def _create_trace(self,
+ name,
+ timestamp,
+ parent_id="8d28af1e-acc0-498c-9890-6908e33eff5f",
+ base_id=BASE_ID,
+ trace_id="e465db5c-9672-45a1-b90b-da918f30aef6"):
+ return {"parent_id": parent_id,
+ "name": name,
+ "base_id": base_id,
+ "trace_id": trace_id,
+ "timestamp": timestamp,
+ "info": {"host": self._host}}
+
+ def _create_start_trace(self):
+ return self._create_trace("wsgi-start", "2016-10-04t11:50:21.902303")
+
+ def _create_stop_trace(self):
+ return self._create_trace("wsgi-stop", "2016-10-04t11:50:30.123456")
+
+ @mock.patch("json.dumps")
+ def test_notify(self, dumps):
+ json_str = mock.sentinel.json_str
+ dumps.return_value = json_str
+
+ trace = self._create_stop_trace()
+ self._driver.notify(trace)
+
+ trace["project"] = self._project
+ trace["service"] = self._service
+ exp_event = {"text": "OSProfiler trace",
+ "fields": [{"name": "base_id",
+ "content": trace["base_id"]},
+ {"name": "trace_id",
+ "content": trace["trace_id"]},
+ {"name": "project",
+ "content": trace["project"]},
+ {"name": "service",
+ "content": trace["service"]},
+ {"name": "name",
+ "content": trace["name"]},
+ {"name": "trace",
+ "content": json_str}]
+ }
+ self._client.send_event.assert_called_once_with(exp_event)
+
+ @mock.patch.object(loginsight.LogInsightDriver, "_append_results")
+ @mock.patch.object(loginsight.LogInsightDriver, "_parse_results")
+ def test_get_report(self, parse_results, append_results):
+ start_trace = self._create_start_trace()
+ start_trace["project"] = self._project
+ start_trace["service"] = self._service
+
+ stop_trace = self._create_stop_trace()
+ stop_trace["project"] = self._project
+ stop_trace["service"] = self._service
+
+ resp = {"events": [{"text": "OSProfiler trace",
+ "fields": [{"name": "trace",
+ "content": json.dumps(start_trace)
+ }
+ ]
+ },
+ {"text": "OSProfiler trace",
+ "fields": [{"name": "trace",
+ "content": json.dumps(stop_trace)
+ }
+ ]
+ }
+ ]
+ }
+ self._client.query_events = mock.Mock(return_value=resp)
+
+ self._driver.get_report(self.BASE_ID)
+ self._client.query_events.assert_called_once_with({"base_id":
+ self.BASE_ID})
+ append_results.assert_has_calls(
+ [mock.call(start_trace["trace_id"], start_trace["parent_id"],
+ start_trace["name"], start_trace["project"],
+ start_trace["service"], start_trace["info"]["host"],
+ start_trace["timestamp"], start_trace),
+ mock.call(stop_trace["trace_id"], stop_trace["parent_id"],
+ stop_trace["name"], stop_trace["project"],
+ stop_trace["service"], stop_trace["info"]["host"],
+ stop_trace["timestamp"], stop_trace)
+ ])
+ parse_results.assert_called_once_with()
+
+
+class LogInsightClientTestCase(test.TestCase):
+
+ def setUp(self):
+ super(LogInsightClientTestCase, self).setUp()
+ self._host = "localhost"
+ self._username = "username"
+ self._password = "password"
+ self._client = loginsight.LogInsightClient(
+ self._host, self._username, self._password)
+ self._client._session_id = "4ff800d1-3175-4b49-9209-39714ea56416"
+
+ def test_check_response_login_timeout(self):
+ resp = mock.Mock(status_code=440)
+ self.assertRaises(
+ exc.LogInsightLoginTimeout, self._client._check_response, resp)
+
+ def test_check_response_api_error(self):
+ resp = mock.Mock(status_code=401, ok=False)
+ resp.text = json.dumps(
+ {"errorMessage": "Invalid username or password.",
+ "errorCode": "FIELD_ERROR"})
+ e = self.assertRaises(
+ exc.LogInsightAPIError, self._client._check_response, resp)
+ self.assertEqual("Invalid username or password.", str(e))
+
+ @mock.patch("requests.Request")
+ @mock.patch("json.dumps")
+ @mock.patch.object(loginsight.LogInsightClient, "_check_response")
+ def test_send_request(self, check_resp, json_dumps, request_class):
+ req = mock.Mock()
+ request_class.return_value = req
+ prep_req = mock.sentinel.prep_req
+ req.prepare = mock.Mock(return_value=prep_req)
+
+ data = mock.sentinel.data
+ json_dumps.return_value = data
+
+ self._client._session = mock.Mock()
+ resp = mock.Mock()
+ self._client._session.send = mock.Mock(return_value=resp)
+ resp_json = mock.sentinel.resp_json
+ resp.json = mock.Mock(return_value=resp_json)
+
+ header = {"X-LI-Session-Id": "foo"}
+ body = mock.sentinel.body
+ params = mock.sentinel.params
+ ret = self._client._send_request(
+ "get", "https", "api/v1/events", header, body, params)
+
+ self.assertEqual(resp_json, ret)
+ exp_headers = {"X-LI-Session-Id": "foo",
+ "content-type": "application/json"}
+ request_class.assert_called_once_with(
+ "get", "https://localhost:9543/api/v1/events", headers=exp_headers,
+ data=data, params=mock.sentinel.params)
+ self._client._session.send.assert_called_once_with(prep_req,
+ verify=False)
+ check_resp.assert_called_once_with(resp)
+
+ @mock.patch.object(loginsight.LogInsightClient, "_send_request")
+ def test_is_current_session_active_with_active_session(self, send_request):
+ self.assertTrue(self._client._is_current_session_active())
+ exp_header = {"X-LI-Session-Id": self._client._session_id}
+ send_request.assert_called_once_with(
+ "get", "https", "api/v1/sessions/current", headers=exp_header)
+
+ @mock.patch.object(loginsight.LogInsightClient, "_send_request")
+ def test_is_current_session_active_with_expired_session(self,
+ send_request):
+ send_request.side_effect = exc.LogInsightLoginTimeout
+
+ self.assertFalse(self._client._is_current_session_active())
+ send_request.assert_called_once_with(
+ "get", "https", "api/v1/sessions/current",
+ headers={"X-LI-Session-Id": self._client._session_id})
+
+ @mock.patch.object(loginsight.LogInsightClient,
+ "_is_current_session_active", return_value=True)
+ @mock.patch.object(loginsight.LogInsightClient, "_send_request")
+ def test_login_with_current_session_active(self, send_request,
+ is_current_session_active):
+ self._client.login()
+ is_current_session_active.assert_called_once_with()
+ send_request.assert_not_called()
+
+ @mock.patch.object(loginsight.LogInsightClient,
+ "_is_current_session_active", return_value=False)
+ @mock.patch.object(loginsight.LogInsightClient, "_send_request")
+ def test_login(self, send_request, is_current_session_active):
+ new_session_id = "569a80aa-be5c-49e5-82c1-bb62392d2667"
+ resp = {"sessionId": new_session_id}
+ send_request.return_value = resp
+
+ self._client.login()
+ is_current_session_active.assert_called_once_with()
+ exp_body = {"username": self._username, "password": self._password}
+ send_request.assert_called_once_with(
+ "post", "https", "api/v1/sessions", body=exp_body)
+ self.assertEqual(new_session_id, self._client._session_id)
+
+ @mock.patch.object(loginsight.LogInsightClient, "_send_request")
+ def test_send_event(self, send_request):
+ event = mock.sentinel.event
+ self._client.send_event(event)
+
+ exp_body = {"events": [event]}
+ exp_path = ("api/v1/events/ingest/%s" %
+ self._client.LI_OSPROFILER_AGENT_ID)
+ send_request.assert_called_once_with(
+ "post", "http", exp_path, body=exp_body)
+
+ @mock.patch.object(loginsight.LogInsightClient, "_send_request")
+ def test_query_events(self, send_request):
+ resp = mock.sentinel.response
+ send_request.return_value = resp
+
+ self.assertEqual(resp, self._client.query_events({"foo": "bar"}))
+ exp_header = {"X-LI-Session-Id": self._client._session_id}
+ exp_params = {"limit": 20000, "timeout": self._client._query_timeout}
+ send_request.assert_called_once_with(
+ "get", "https", "api/v1/events/foo/CONTAINS+bar/timestamp/GT+0",
+ headers=exp_header, params=exp_params)
+
+ @mock.patch.object(loginsight.LogInsightClient, "_send_request")
+ @mock.patch.object(loginsight.LogInsightClient, "login")
+ def test_query_events_with_session_expiry(self, login, send_request):
+ resp = mock.sentinel.response
+ send_request.side_effect = [exc.LogInsightLoginTimeout, resp]
+
+ self.assertEqual(resp, self._client.query_events({"foo": "bar"}))
+ login.assert_called_once_with()
+ exp_header = {"X-LI-Session-Id": self._client._session_id}
+ exp_params = {"limit": 20000, "timeout": self._client._query_timeout}
+ exp_send_request_call = mock.call(
+ "get", "https", "api/v1/events/foo/CONTAINS+bar/timestamp/GT+0",
+ headers=exp_header, params=exp_params)
+ send_request.assert_has_calls([exp_send_request_call]*2)
diff --git a/requirements.txt b/requirements.txt
index 525dd4f..d499549 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,3 +2,6 @@ six>=1.9.0 # MIT
oslo.messaging>=5.2.0 # Apache-2.0
oslo.utils>=3.16.0 # Apache-2.0
WebOb>=1.6.0 # MIT
+requests>=2.10.0 # Apache-2.0
+netaddr>=0.7.13,!=0.7.16 # BSD
+oslo.concurrency>=3.8.0 # Apache-2.0
diff --git a/test-requirements.txt b/test-requirements.txt
index 075dfcf..76ba465 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -1,6 +1,7 @@
hacking>=0.11.0,<0.12 # Apache-2.0
coverage>=3.6 # Apache-2.0
+ddt>=1.0.1 # MIT
mock>=2.0 # BSD
python-subunit>=0.0.18 # Apache-2.0/BSD
testrepository>=0.0.18 # Apache-2.0/BSD