summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Liu <andyliuliming@outlook.com>2018-08-24 22:25:37 +0000
committerServer Team CI Bot <josh.powers+server-team-bot@canonical.com>2018-08-24 22:25:37 +0000
commit2320c3de2712c2f320b0d8af4aa129219cc2ad04 (patch)
treee13c9bc0269ae71a4b32ea7eb89053b6362015df
parentdab59087155d3963849a36b3f63ee662047f708b (diff)
downloadcloud-init-git-2320c3de2712c2f320b0d8af4aa129219cc2ad04.tar.gz
logging: Add logging config type hyperv for reporting via Azure KVP
Linux guests can provide information to Hyper-V hosts via KVP. KVP allows the guests to provide any string key-value-pairs back to the host's registry. On linux, kvp communication pools are presented as pool files in /var/lib/hyperv/.kvp_pool_#. The following reporting configuration can enable this kvp reporting in addition to default logging if the pool files exist: reporting:     logging:         type: log     telemetry:         type: hyperv
-rw-r--r--cloudinit/cloud.py4
-rw-r--r--cloudinit/cmd/main.py4
-rwxr-xr-xcloudinit/distros/__init__.py2
-rw-r--r--cloudinit/reporting/__init__.py2
-rw-r--r--cloudinit/reporting/handlers.py255
-rw-r--r--cloudinit/stages.py2
-rw-r--r--tests/unittests/test_reporting_hyperv.py134
7 files changed, 396 insertions, 7 deletions
diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py
index 6d12c437..7ae98e1c 100644
--- a/cloudinit/cloud.py
+++ b/cloudinit/cloud.py
@@ -47,7 +47,7 @@ class Cloud(object):
@property
def cfg(self):
- # Ensure that not indirectly modified
+ # Ensure that cfg is not indirectly modified
return copy.deepcopy(self._cfg)
def run(self, name, functor, args, freq=None, clear_on_fail=False):
@@ -61,7 +61,7 @@ class Cloud(object):
return None
return fn
- # The rest of thes are just useful proxies
+ # The rest of these are just useful proxies
def get_userdata(self, apply_filter=True):
return self.datasource.get_userdata(apply_filter)
diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py
index d6ba90f4..c0edee18 100644
--- a/cloudinit/cmd/main.py
+++ b/cloudinit/cmd/main.py
@@ -315,7 +315,7 @@ def main_init(name, args):
existing = "trust"
init.purge_cache()
- # Delete the non-net file as well
+ # Delete the no-net file as well
util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))
# Stage 5
@@ -339,7 +339,7 @@ def main_init(name, args):
" Likely bad things to come!"))
if not args.force:
init.apply_network_config(bring_up=not args.local)
- LOG.debug("[%s] Exiting without datasource in local mode", mode)
+ LOG.debug("[%s] Exiting without datasource", mode)
if mode == sources.DSMODE_LOCAL:
return (None, [])
else:
diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
index ab0b0776..fde054e9 100755
--- a/cloudinit/distros/__init__.py
+++ b/cloudinit/distros/__init__.py
@@ -157,7 +157,7 @@ class Distro(object):
distro)
header = '\n'.join([
"# Converted from network_config for distro %s" % distro,
- "# Implmentation of _write_network_config is needed."
+ "# Implementation of _write_network_config is needed."
])
ns = network_state.parse_net_config_data(netconfig)
contents = eni.network_state_to_eni(
diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
index 1ed2b487..e047767e 100644
--- a/cloudinit/reporting/__init__.py
+++ b/cloudinit/reporting/__init__.py
@@ -18,7 +18,7 @@ DEFAULT_CONFIG = {
def update_configuration(config):
- """Update the instanciated_handler_registry.
+ """Update the instantiated_handler_registry.
:param config:
The dictionary containing changes to apply. If a key is given
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 4066076c..4b4bb396 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -1,17 +1,34 @@
# This file is part of cloud-init. See LICENSE file for license information.
import abc
+import fcntl
import json
import six
+import os
+import re
+import struct
+import threading
+import time
from cloudinit import log as logging
from cloudinit.registry import DictRegistry
from cloudinit import (url_helper, util)
+from datetime import datetime
+if six.PY2:
+ import multiprocessing.queues as queue
+ from multiprocessing.queues import JoinableQueue as JQueue
+else:
+ import queue
+ from queue import Queue as JQueue
LOG = logging.getLogger(__name__)
+class ReportException(Exception):
+ pass
+
+
@six.add_metaclass(abc.ABCMeta)
class ReportingHandler(object):
"""Base class for report handlers.
@@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler):
LOG.warning("failed posting event: %s", event.as_string())
+class HyperVKvpReportingHandler(ReportingHandler):
+ """
+ Reports events to a Hyper-V host using Key-Value-Pair exchange protocol
+ and can be used to obtain high level diagnostic information from the host.
+
+ To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be
+ running. It reads the kvp_file when the host requests the guest to
+ enumerate the KVP's.
+
+ This reporter collates all events for a module (origin|name) in a single
+ json string in the dictionary.
+
+ For more information, see
+ https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
+ """
+ HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
+ HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
+ HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
+ HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
+ EVENT_PREFIX = 'CLOUD_INIT'
+ MSG_KEY = 'msg'
+ RESULT_KEY = 'result'
+ DESC_IDX_KEY = 'msg_i'
+ JSON_SEPARATORS = (',', ':')
+ KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
+
+ def __init__(self,
+ kvp_file_path=KVP_POOL_FILE_GUEST,
+ event_types=None):
+ super(HyperVKvpReportingHandler, self).__init__()
+ self._kvp_file_path = kvp_file_path
+ self._event_types = event_types
+ self.running = False
+ self.queue_lock = threading.Lock()
+ self.running_lock = threading.Lock()
+ self.q = JQueue()
+ self.kvp_file = None
+ self.incarnation_no = self._get_incarnation_no()
+ self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
+ self.incarnation_no)
+ self._current_offset = 0
+
+ def _get_incarnation_no(self):
+ """
+ use the time passed as the incarnation number.
+ the incarnation number is the number which are used to
+ distinguish the old data stored in kvp and the new data.
+ """
+ uptime_str = util.uptime()
+ try:
+ return int(time.time() - float(uptime_str))
+ except ValueError:
+ LOG.warning("uptime '%s' not in correct format.", uptime_str)
+ return 0
+
+ def _iterate_kvps(self, offset):
+ """iterate the kvp file from the current offset."""
+ try:
+ with open(self._kvp_file_path, 'rb+') as f:
+ self.kvp_file = f
+ fcntl.flock(f, fcntl.LOCK_EX)
+ f.seek(offset)
+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
+ while len(record_data) == self.HV_KVP_RECORD_SIZE:
+ self._current_offset += self.HV_KVP_RECORD_SIZE
+ kvp_item = self._decode_kvp_item(record_data)
+ yield kvp_item
+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
+ fcntl.flock(f, fcntl.LOCK_UN)
+ finally:
+ self.kvp_file = None
+
+ def _event_key(self, event):
+ """
+ the event key format is:
+ CLOUD_INIT|<incarnation number>|<event_type>|<event_name>
+ """
+ return u"{0}|{1}|{2}".format(self.event_key_prefix,
+ event.event_type, event.name)
+
+ def _encode_kvp_item(self, key, value):
+ data = (struct.pack("%ds%ds" % (
+ self.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
+ self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
+ key.encode('utf-8'), value.encode('utf-8')))
+ return data
+
+ def _decode_kvp_item(self, record_data):
+ record_data_len = len(record_data)
+ if record_data_len != self.HV_KVP_RECORD_SIZE:
+ raise ReportException(
+ "record_data len not correct {0} {1}."
+ .format(record_data_len, self.HV_KVP_RECORD_SIZE))
+ k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8')
+ .strip('\x00'))
+ v = (
+ record_data[
+ self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE
+ ].decode('utf-8').strip('\x00'))
+
+ return {'key': k, 'value': v}
+
+ def _update_kvp_item(self, record_data):
+ if self.kvp_file is None:
+ raise ReportException(
+ "kvp file '{0}' not opened."
+ .format(self._kvp_file_path))
+ self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
+ self.kvp_file.write(record_data)
+
+ def _append_kvp_item(self, record_data):
+ with open(self._kvp_file_path, 'rb+') as f:
+ fcntl.flock(f, fcntl.LOCK_EX)
+ # seek to end of the file
+ f.seek(0, 2)
+ f.write(record_data)
+ f.flush()
+ fcntl.flock(f, fcntl.LOCK_UN)
+ self._current_offset = f.tell()
+
+ def _break_down(self, key, meta_data, description):
+ del meta_data[self.MSG_KEY]
+ des_in_json = json.dumps(description)
+ des_in_json = des_in_json[1:(len(des_in_json) - 1)]
+ i = 0
+ result_array = []
+ message_place_holder = "\"" + self.MSG_KEY + "\":\"\""
+ while True:
+ meta_data[self.DESC_IDX_KEY] = i
+ meta_data[self.MSG_KEY] = ''
+ data_without_desc = json.dumps(meta_data,
+ separators=self.JSON_SEPARATORS)
+ room_for_desc = (
+ self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
+ len(data_without_desc) - 8)
+ value = data_without_desc.replace(
+ message_place_holder,
+ '"{key}":"{desc}"'.format(
+ key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
+ result_array.append(self._encode_kvp_item(key, value))
+ i += 1
+ des_in_json = des_in_json[room_for_desc:]
+ if len(des_in_json) == 0:
+ break
+ return result_array
+
+ def _encode_event(self, event):
+ """
+ encode the event into kvp data bytes.
+ if the event content reaches the maximum length of kvp value.
+ then it would be cut to multiple slices.
+ """
+ key = self._event_key(event)
+ meta_data = {
+ "name": event.name,
+ "type": event.event_type,
+ "ts": (datetime.utcfromtimestamp(event.timestamp)
+ .isoformat() + 'Z'),
+ }
+ if hasattr(event, self.RESULT_KEY):
+ meta_data[self.RESULT_KEY] = event.result
+ meta_data[self.MSG_KEY] = event.description
+ value = json.dumps(meta_data, separators=self.JSON_SEPARATORS)
+ # if it reaches the maximum length of kvp value,
+ # break it down to slices.
+ # this should be very corner case.
+ if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
+ return self._break_down(key, meta_data, event.description)
+ else:
+ data = self._encode_kvp_item(key, value)
+ return [data]
+
+ def _publish_event_routine(self):
+ while True:
+ event = None
+ try:
+ # acquire the lock.
+ event = self.q.get_nowait()
+ need_append = True
+ try:
+ if not os.path.exists(self._kvp_file_path):
+ LOG.warning(
+ "skip writing events %s to %s. file not present.",
+ event.as_string(),
+ self._kvp_file_path)
+ encoded_event = self._encode_event(event)
+ # for each encoded_event
+ for encoded_data in (encoded_event):
+ for kvp in self._iterate_kvps(self._current_offset):
+ match = (
+ re.match(
+ r"^{0}\|(\d+)\|.+"
+ .format(self.EVENT_PREFIX),
+ kvp['key']
+ ))
+ if match:
+ match_groups = match.groups(0)
+ if int(match_groups[0]) < self.incarnation_no:
+ need_append = False
+ self._update_kvp_item(encoded_data)
+ break
+ if need_append:
+ self._append_kvp_item(encoded_data)
+ except IOError as e:
+ LOG.warning(
+ "failed posting event to kvp: %s e:%s",
+ event.as_string(), e)
+ self.running = False
+ break
+ finally:
+ self.q.task_done()
+ except queue.Empty:
+ with self.queue_lock:
+ # double check the queue is empty
+ if self.q.empty():
+ self.running = False
+ break
+
+ def trigger_publish_event(self):
+ if not self.running:
+ with self.running_lock:
+ if not self.running:
+ self.running = True
+ thread = threading.Thread(
+ target=self._publish_event_routine)
+ thread.start()
+
+ # since the saving to the kvp pool can be a time costing task
+ # if the kvp pool already contains a chunk of data,
+ # so defer it to another thread.
+ def publish_event(self, event):
+ if (not self._event_types or event.event_type in self._event_types):
+ with self.queue_lock:
+ self.q.put(event)
+ self.trigger_publish_event()
+
+
available_handlers = DictRegistry()
available_handlers.register_item('log', LogHandler)
available_handlers.register_item('print', PrintHandler)
available_handlers.register_item('webhook', WebHookHandler)
+available_handlers.register_item('hyperv', HyperVKvpReportingHandler)
# vi: ts=4 expandtab
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
index c132b57d..8874d405 100644
--- a/cloudinit/stages.py
+++ b/cloudinit/stages.py
@@ -510,7 +510,7 @@ class Init(object):
# The default frequency if handlers don't have one
'frequency': frequency,
# This will be used when new handlers are found
- # to help write there contents to files with numbered
+ # to help write their contents to files with numbered
# names...
'handlercount': 0,
'excluded': excluded,
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
new file mode 100644
index 00000000..2e64c6c7
--- /dev/null
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -0,0 +1,134 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.reporting import events
+from cloudinit.reporting import handlers
+
+import json
+import os
+
+from cloudinit import util
+from cloudinit.tests.helpers import CiTestCase
+
+
+class TestKvpEncoding(CiTestCase):
+ def test_encode_decode(self):
+ kvp = {'key': 'key1', 'value': 'value1'}
+ kvp_reporting = handlers.HyperVKvpReportingHandler()
+ data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
+ self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
+ decoded_kvp = kvp_reporting._decode_kvp_item(data)
+ self.assertEqual(kvp, decoded_kvp)
+
+
+class TextKvpReporter(CiTestCase):
+ def setUp(self):
+ super(TextKvpReporter, self).setUp()
+ self.tmp_file_path = self.tmp_path('kvp_pool_file')
+ util.ensure_file(self.tmp_file_path)
+
+ def test_event_type_can_be_filtered(self):
+ reporter = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path,
+ event_types=['foo', 'bar'])
+
+ reporter.publish_event(
+ events.ReportingEvent('foo', 'name', 'description'))
+ reporter.publish_event(
+ events.ReportingEvent('some_other', 'name', 'description3'))
+ reporter.q.join()
+
+ kvps = list(reporter._iterate_kvps(0))
+ self.assertEqual(1, len(kvps))
+
+ reporter.publish_event(
+ events.ReportingEvent('bar', 'name', 'description2'))
+ reporter.q.join()
+ kvps = list(reporter._iterate_kvps(0))
+ self.assertEqual(2, len(kvps))
+
+ self.assertIn('foo', kvps[0]['key'])
+ self.assertIn('bar', kvps[1]['key'])
+ self.assertNotIn('some_other', kvps[0]['key'])
+ self.assertNotIn('some_other', kvps[1]['key'])
+
+ def test_events_are_over_written(self):
+ reporter = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path)
+
+ self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
+
+ reporter.publish_event(
+ events.ReportingEvent('foo', 'name1', 'description'))
+ reporter.publish_event(
+ events.ReportingEvent('foo', 'name2', 'description'))
+ reporter.q.join()
+ self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
+
+ reporter2 = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path)
+ reporter2.incarnation_no = reporter.incarnation_no + 1
+ reporter2.publish_event(
+ events.ReportingEvent('foo', 'name3', 'description'))
+ reporter2.q.join()
+
+ self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
+
+ def test_events_with_higher_incarnation_not_over_written(self):
+ reporter = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path)
+
+ self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
+
+ reporter.publish_event(
+ events.ReportingEvent('foo', 'name1', 'description'))
+ reporter.publish_event(
+ events.ReportingEvent('foo', 'name2', 'description'))
+ reporter.q.join()
+ self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
+
+ reporter3 = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path)
+ reporter3.incarnation_no = reporter.incarnation_no - 1
+ reporter3.publish_event(
+ events.ReportingEvent('foo', 'name3', 'description'))
+ reporter3.q.join()
+ self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
+
+ def test_finish_event_result_is_logged(self):
+ reporter = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path)
+ reporter.publish_event(
+ events.FinishReportingEvent('name2', 'description1',
+ result=events.status.FAIL))
+ reporter.q.join()
+ self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value'])
+
+ def test_file_operation_issue(self):
+ os.remove(self.tmp_file_path)
+ reporter = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path)
+ reporter.publish_event(
+ events.FinishReportingEvent('name2', 'description1',
+ result=events.status.FAIL))
+ reporter.q.join()
+
+ def test_event_very_long(self):
+ reporter = handlers.HyperVKvpReportingHandler(
+ kvp_file_path=self.tmp_file_path)
+ description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
+ long_event = events.FinishReportingEvent(
+ 'event_name',
+ description,
+ result=events.status.FAIL)
+ reporter.publish_event(long_event)
+ reporter.q.join()
+ kvps = list(reporter._iterate_kvps(0))
+ self.assertEqual(3, len(kvps))
+
+ # restore from the kvp to see the content are all there
+ full_description = ''
+ for i in range(len(kvps)):
+ msg_slice = json.loads(kvps[i]['value'])
+ self.assertEqual(msg_slice['msg_i'], i)
+ full_description += msg_slice['msg']
+ self.assertEqual(description, full_description)