diff options
-rw-r--r-- | osprofiler/drivers/__init__.py | 1 | ||||
-rw-r--r-- | osprofiler/drivers/loginsight.py | 263 | ||||
-rw-r--r-- | osprofiler/exc.py | 8 | ||||
-rw-r--r-- | osprofiler/tests/drivers/test_loginsight.py | 296 | ||||
-rw-r--r-- | requirements.txt | 3 | ||||
-rw-r--r-- | test-requirements.txt | 1 |
6 files changed, 572 insertions, 0 deletions
diff --git a/osprofiler/drivers/__init__.py b/osprofiler/drivers/__init__.py index f208582..bb287da 100644 --- a/osprofiler/drivers/__init__.py +++ b/osprofiler/drivers/__init__.py @@ -1,6 +1,7 @@ from osprofiler.drivers import base # noqa from osprofiler.drivers import ceilometer # noqa from osprofiler.drivers import elasticsearch_driver # noqa +from osprofiler.drivers import loginsight # noqa from osprofiler.drivers import messaging # noqa from osprofiler.drivers import mongodb # noqa from osprofiler.drivers import redis_driver # noqa diff --git a/osprofiler/drivers/loginsight.py b/osprofiler/drivers/loginsight.py new file mode 100644 index 0000000..4e875ae --- /dev/null +++ b/osprofiler/drivers/loginsight.py @@ -0,0 +1,263 @@ +# Copyright (c) 2016 VMware, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Classes to use VMware vRealize Log Insight as the trace data store. +""" + +import json +import logging as log + +import netaddr +from oslo_concurrency.lockutils import synchronized +import requests +import six.moves.urllib.parse as urlparse + +from osprofiler.drivers import base +from osprofiler import exc + +LOG = log.getLogger(__name__) + + +class LogInsightDriver(base.Driver): + """Driver for storing trace data in VMware vRealize Log Insight. + + The driver uses Log Insight ingest service to store trace data and uses + the query service to retrieve it. The minimum required Log Insight version + is 3.3. + + The connection string to initialize the driver should be of the format: + loginsight://<username>:<password>@<loginsight-host> + + If the username or password contains the character ':' or '@', it must be + escaped using URL encoding. For example, the connection string to connect + to Log Insight server at 10.1.2.3 using username "osprofiler" and password + "p@ssword" is: + loginsight://osprofiler:p%40ssword@10.1.2.3 + """ + def __init__( + self, connection_str, project=None, service=None, host=None, + **kwargs): + super(LogInsightDriver, self).__init__(connection_str, + project=project, + service=service, + host=host) + + parsed_connection = urlparse.urlparse(connection_str) + try: + creds, host = parsed_connection.netloc.split("@") + username, password = creds.split(":") + except ValueError: + raise ValueError("Connection string format is: loginsight://" + "<username>:<password>@<loginsight-host>. If the " + "username or password contains the character '@' " + "or ':', it must be escaped using URL encoding.") + + username = urlparse.unquote(username) + password = urlparse.unquote(password) + self._client = LogInsightClient(host, username, password) + + self._client.login() + + @classmethod + def get_name(cls): + return "loginsight" + + def notify(self, info): + """Send trace to Log Insight server.""" + + trace = info.copy() + trace["project"] = self.project + trace["service"] = self.service + + event = {"text": "OSProfiler trace"} + + def _create_field(name, content): + return {"name": name, "content": content} + + event["fields"] = [_create_field("base_id", trace["base_id"]), + _create_field("trace_id", trace["trace_id"]), + _create_field("project", trace["project"]), + _create_field("service", trace["service"]), + _create_field("name", trace["name"]), + _create_field("trace", json.dumps(trace))] + + self._client.send_event(event) + + def get_report(self, base_id): + """Retrieves and parses trace data from Log Insight. + + :param base_id: Trace base ID + """ + response = self._client.query_events({"base_id": base_id}) + + if "events" in response: + for event in response["events"]: + if "fields" not in event: + continue + + for field in event["fields"]: + if field["name"] == "trace": + trace = json.loads(field["content"]) + trace_id = trace["trace_id"] + parent_id = trace["parent_id"] + name = trace["name"] + project = trace["project"] + service = trace["service"] + host = trace["info"]["host"] + timestamp = trace["timestamp"] + + self._append_results( + trace_id, parent_id, name, project, service, host, + timestamp, trace) + break + + return self._parse_results() + + +class LogInsightClient(object): + """A minimal Log Insight client.""" + + LI_OSPROFILER_AGENT_ID = "F52D775B-6017-4787-8C8A-F21AE0AEC057" + + # API paths + SESSIONS_PATH = "api/v1/sessions" + CURRENT_SESSIONS_PATH = "api/v1/sessions/current" + EVENTS_INGEST_PATH = "api/v1/events/ingest/%s" % LI_OSPROFILER_AGENT_ID + QUERY_EVENTS_BASE_PATH = "api/v1/events" + + def __init__(self, host, username, password, api_port=9000, + api_ssl_port=9543, query_timeout=60000): + self._host = host + self._username = username + self._password = password + self._api_port = api_port + self._api_ssl_port = api_ssl_port + self._query_timeout = query_timeout + self._session = requests.Session() + self._session_id = None + + def _build_base_url(self, scheme): + proto_str = "%s://" % scheme + host_str = ("[%s]" % self._host if netaddr.valid_ipv6(self._host) + else self._host) + port_str = ":%d" % (self._api_ssl_port if scheme == "https" + else self._api_port) + return proto_str + host_str + port_str + + def _check_response(self, resp): + if resp.status_code == 440: + raise exc.LogInsightLoginTimeout() + + if not resp.ok: + msg = "n/a" + if resp.text: + try: + body = json.loads(resp.text) + msg = body.get("errorMessage", msg) + except ValueError: + pass + else: + msg = resp.reason + raise exc.LogInsightAPIError(msg) + + def _send_request( + self, method, scheme, path, headers=None, body=None, params=None): + url = "%s/%s" % (self._build_base_url(scheme), path) + + headers = headers or {} + headers["content-type"] = "application/json" + body = body or {} + params = params or {} + + req = requests.Request( + method, url, headers=headers, data=json.dumps(body), params=params) + req = req.prepare() + resp = self._session.send(req, verify=False) + + self._check_response(resp) + return resp.json() + + def _get_auth_header(self): + return {"X-LI-Session-Id": self._session_id} + + def _trunc_session_id(self): + if self._session_id: + return self._session_id[-5:] + + def _is_current_session_active(self): + try: + self._send_request("get", + "https", + self.CURRENT_SESSIONS_PATH, + headers=self._get_auth_header()) + LOG.debug("Current session %s is active.", + self._trunc_session_id()) + return True + except (exc.LogInsightLoginTimeout, exc.LogInsightAPIError): + LOG.debug("Current session %s is not active.", + self._trunc_session_id()) + return False + + @synchronized("li_login_lock") + def login(self): + # Another thread might have created the session while the current + # thread was waiting for the lock. + if self._session_id and self._is_current_session_active(): + return + + LOG.info("Logging into Log Insight server: %s.", self._host) + resp = self._send_request("post", + "https", + self.SESSIONS_PATH, + body={"username": self._username, + "password": self._password}) + + self._session_id = resp["sessionId"] + LOG.debug("Established session %s.", self._trunc_session_id()) + + def send_event(self, event): + events = {"events": [event]} + self._send_request("post", + "http", + self.EVENTS_INGEST_PATH, + body=events) + + def query_events(self, params): + # Assumes that the keys and values in the params are strings and + # the operator is "CONTAINS". + constraints = [] + for field, value in params.items(): + constraints.append("%s/CONTAINS+%s" % (field, value)) + constraints.append("timestamp/GT+0") + + path = "%s/%s" % (self.QUERY_EVENTS_BASE_PATH, "/".join(constraints)) + + def _query_events(): + return self._send_request("get", + "https", + path, + headers=self._get_auth_header(), + params={"limit": 20000, + "timeout": self._query_timeout}) + try: + resp = _query_events() + except exc.LogInsightLoginTimeout: + # Login again and re-try. + LOG.debug("Current session timed out.") + self.login() + resp = _query_events() + + return resp diff --git a/osprofiler/exc.py b/osprofiler/exc.py index 0ffc9c9..0f2fa33 100644 --- a/osprofiler/exc.py +++ b/osprofiler/exc.py @@ -22,3 +22,11 @@ class CommandError(Exception): def __str__(self): return self.message or self.__class__.__doc__ + + +class LogInsightAPIError(Exception): + pass + + +class LogInsightLoginTimeout(Exception): + pass diff --git a/osprofiler/tests/drivers/test_loginsight.py b/osprofiler/tests/drivers/test_loginsight.py new file mode 100644 index 0000000..5e28aee --- /dev/null +++ b/osprofiler/tests/drivers/test_loginsight.py @@ -0,0 +1,296 @@ +# Copyright (c) 2016 VMware, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json + +import ddt +import mock + +from osprofiler.drivers import loginsight +from osprofiler import exc +from osprofiler.tests import test + + +@ddt.ddt +class LogInsightDriverTestCase(test.TestCase): + + BASE_ID = "8d28af1e-acc0-498c-9890-6908e33eff5f" + + def setUp(self): + super(LogInsightDriverTestCase, self).setUp() + self._client = mock.Mock(spec=loginsight.LogInsightClient) + self._project = "cinder" + self._service = "osapi_volume" + self._host = "ubuntu" + with mock.patch.object(loginsight, "LogInsightClient", + return_value=self._client): + self._driver = loginsight.LogInsightDriver( + "loginsight://username:password@host", + project=self._project, + service=self._service, + host=self._host) + + @mock.patch.object(loginsight, "LogInsightClient") + def test_init(self, client_class): + client = mock.Mock() + client_class.return_value = client + + loginsight.LogInsightDriver("loginsight://username:password@host") + client_class.assert_called_once_with("host", "username", "password") + client.login.assert_called_once_with() + + @ddt.data("loginsight://username@host", + "loginsight://username:p@ssword@host", + "loginsight://us:rname:password@host") + def test_init_with_invalid_connection_string(self, conn_str): + self.assertRaises(ValueError, loginsight.LogInsightDriver, conn_str) + + @mock.patch.object(loginsight, "LogInsightClient") + def test_init_with_special_chars_in_conn_str(self, client_class): + client = mock.Mock() + client_class.return_value = client + + loginsight.LogInsightDriver("loginsight://username:p%40ssword@host") + client_class.assert_called_once_with("host", "username", "p@ssword") + client.login.assert_called_once_with() + + def test_get_name(self): + self.assertEqual("loginsight", self._driver.get_name()) + + def _create_trace(self, + name, + timestamp, + parent_id="8d28af1e-acc0-498c-9890-6908e33eff5f", + base_id=BASE_ID, + trace_id="e465db5c-9672-45a1-b90b-da918f30aef6"): + return {"parent_id": parent_id, + "name": name, + "base_id": base_id, + "trace_id": trace_id, + "timestamp": timestamp, + "info": {"host": self._host}} + + def _create_start_trace(self): + return self._create_trace("wsgi-start", "2016-10-04t11:50:21.902303") + + def _create_stop_trace(self): + return self._create_trace("wsgi-stop", "2016-10-04t11:50:30.123456") + + @mock.patch("json.dumps") + def test_notify(self, dumps): + json_str = mock.sentinel.json_str + dumps.return_value = json_str + + trace = self._create_stop_trace() + self._driver.notify(trace) + + trace["project"] = self._project + trace["service"] = self._service + exp_event = {"text": "OSProfiler trace", + "fields": [{"name": "base_id", + "content": trace["base_id"]}, + {"name": "trace_id", + "content": trace["trace_id"]}, + {"name": "project", + "content": trace["project"]}, + {"name": "service", + "content": trace["service"]}, + {"name": "name", + "content": trace["name"]}, + {"name": "trace", + "content": json_str}] + } + self._client.send_event.assert_called_once_with(exp_event) + + @mock.patch.object(loginsight.LogInsightDriver, "_append_results") + @mock.patch.object(loginsight.LogInsightDriver, "_parse_results") + def test_get_report(self, parse_results, append_results): + start_trace = self._create_start_trace() + start_trace["project"] = self._project + start_trace["service"] = self._service + + stop_trace = self._create_stop_trace() + stop_trace["project"] = self._project + stop_trace["service"] = self._service + + resp = {"events": [{"text": "OSProfiler trace", + "fields": [{"name": "trace", + "content": json.dumps(start_trace) + } + ] + }, + {"text": "OSProfiler trace", + "fields": [{"name": "trace", + "content": json.dumps(stop_trace) + } + ] + } + ] + } + self._client.query_events = mock.Mock(return_value=resp) + + self._driver.get_report(self.BASE_ID) + self._client.query_events.assert_called_once_with({"base_id": + self.BASE_ID}) + append_results.assert_has_calls( + [mock.call(start_trace["trace_id"], start_trace["parent_id"], + start_trace["name"], start_trace["project"], + start_trace["service"], start_trace["info"]["host"], + start_trace["timestamp"], start_trace), + mock.call(stop_trace["trace_id"], stop_trace["parent_id"], + stop_trace["name"], stop_trace["project"], + stop_trace["service"], stop_trace["info"]["host"], + stop_trace["timestamp"], stop_trace) + ]) + parse_results.assert_called_once_with() + + +class LogInsightClientTestCase(test.TestCase): + + def setUp(self): + super(LogInsightClientTestCase, self).setUp() + self._host = "localhost" + self._username = "username" + self._password = "password" + self._client = loginsight.LogInsightClient( + self._host, self._username, self._password) + self._client._session_id = "4ff800d1-3175-4b49-9209-39714ea56416" + + def test_check_response_login_timeout(self): + resp = mock.Mock(status_code=440) + self.assertRaises( + exc.LogInsightLoginTimeout, self._client._check_response, resp) + + def test_check_response_api_error(self): + resp = mock.Mock(status_code=401, ok=False) + resp.text = json.dumps( + {"errorMessage": "Invalid username or password.", + "errorCode": "FIELD_ERROR"}) + e = self.assertRaises( + exc.LogInsightAPIError, self._client._check_response, resp) + self.assertEqual("Invalid username or password.", str(e)) + + @mock.patch("requests.Request") + @mock.patch("json.dumps") + @mock.patch.object(loginsight.LogInsightClient, "_check_response") + def test_send_request(self, check_resp, json_dumps, request_class): + req = mock.Mock() + request_class.return_value = req + prep_req = mock.sentinel.prep_req + req.prepare = mock.Mock(return_value=prep_req) + + data = mock.sentinel.data + json_dumps.return_value = data + + self._client._session = mock.Mock() + resp = mock.Mock() + self._client._session.send = mock.Mock(return_value=resp) + resp_json = mock.sentinel.resp_json + resp.json = mock.Mock(return_value=resp_json) + + header = {"X-LI-Session-Id": "foo"} + body = mock.sentinel.body + params = mock.sentinel.params + ret = self._client._send_request( + "get", "https", "api/v1/events", header, body, params) + + self.assertEqual(resp_json, ret) + exp_headers = {"X-LI-Session-Id": "foo", + "content-type": "application/json"} + request_class.assert_called_once_with( + "get", "https://localhost:9543/api/v1/events", headers=exp_headers, + data=data, params=mock.sentinel.params) + self._client._session.send.assert_called_once_with(prep_req, + verify=False) + check_resp.assert_called_once_with(resp) + + @mock.patch.object(loginsight.LogInsightClient, "_send_request") + def test_is_current_session_active_with_active_session(self, send_request): + self.assertTrue(self._client._is_current_session_active()) + exp_header = {"X-LI-Session-Id": self._client._session_id} + send_request.assert_called_once_with( + "get", "https", "api/v1/sessions/current", headers=exp_header) + + @mock.patch.object(loginsight.LogInsightClient, "_send_request") + def test_is_current_session_active_with_expired_session(self, + send_request): + send_request.side_effect = exc.LogInsightLoginTimeout + + self.assertFalse(self._client._is_current_session_active()) + send_request.assert_called_once_with( + "get", "https", "api/v1/sessions/current", + headers={"X-LI-Session-Id": self._client._session_id}) + + @mock.patch.object(loginsight.LogInsightClient, + "_is_current_session_active", return_value=True) + @mock.patch.object(loginsight.LogInsightClient, "_send_request") + def test_login_with_current_session_active(self, send_request, + is_current_session_active): + self._client.login() + is_current_session_active.assert_called_once_with() + send_request.assert_not_called() + + @mock.patch.object(loginsight.LogInsightClient, + "_is_current_session_active", return_value=False) + @mock.patch.object(loginsight.LogInsightClient, "_send_request") + def test_login(self, send_request, is_current_session_active): + new_session_id = "569a80aa-be5c-49e5-82c1-bb62392d2667" + resp = {"sessionId": new_session_id} + send_request.return_value = resp + + self._client.login() + is_current_session_active.assert_called_once_with() + exp_body = {"username": self._username, "password": self._password} + send_request.assert_called_once_with( + "post", "https", "api/v1/sessions", body=exp_body) + self.assertEqual(new_session_id, self._client._session_id) + + @mock.patch.object(loginsight.LogInsightClient, "_send_request") + def test_send_event(self, send_request): + event = mock.sentinel.event + self._client.send_event(event) + + exp_body = {"events": [event]} + exp_path = ("api/v1/events/ingest/%s" % + self._client.LI_OSPROFILER_AGENT_ID) + send_request.assert_called_once_with( + "post", "http", exp_path, body=exp_body) + + @mock.patch.object(loginsight.LogInsightClient, "_send_request") + def test_query_events(self, send_request): + resp = mock.sentinel.response + send_request.return_value = resp + + self.assertEqual(resp, self._client.query_events({"foo": "bar"})) + exp_header = {"X-LI-Session-Id": self._client._session_id} + exp_params = {"limit": 20000, "timeout": self._client._query_timeout} + send_request.assert_called_once_with( + "get", "https", "api/v1/events/foo/CONTAINS+bar/timestamp/GT+0", + headers=exp_header, params=exp_params) + + @mock.patch.object(loginsight.LogInsightClient, "_send_request") + @mock.patch.object(loginsight.LogInsightClient, "login") + def test_query_events_with_session_expiry(self, login, send_request): + resp = mock.sentinel.response + send_request.side_effect = [exc.LogInsightLoginTimeout, resp] + + self.assertEqual(resp, self._client.query_events({"foo": "bar"})) + login.assert_called_once_with() + exp_header = {"X-LI-Session-Id": self._client._session_id} + exp_params = {"limit": 20000, "timeout": self._client._query_timeout} + exp_send_request_call = mock.call( + "get", "https", "api/v1/events/foo/CONTAINS+bar/timestamp/GT+0", + headers=exp_header, params=exp_params) + send_request.assert_has_calls([exp_send_request_call]*2) diff --git a/requirements.txt b/requirements.txt index 525dd4f..d499549 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,6 @@ six>=1.9.0 # MIT oslo.messaging>=5.2.0 # Apache-2.0 oslo.utils>=3.16.0 # Apache-2.0 WebOb>=1.6.0 # MIT +requests>=2.10.0 # Apache-2.0 +netaddr>=0.7.13,!=0.7.16 # BSD +oslo.concurrency>=3.8.0 # Apache-2.0 diff --git a/test-requirements.txt b/test-requirements.txt index 075dfcf..76ba465 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,7 @@ hacking>=0.11.0,<0.12 # Apache-2.0 coverage>=3.6 # Apache-2.0 +ddt>=1.0.1 # MIT mock>=2.0 # BSD python-subunit>=0.0.18 # Apache-2.0/BSD testrepository>=0.0.18 # Apache-2.0/BSD |