summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Harper <ryan.harper@canonical.com>2018-08-31 21:47:18 +0000
committerServer Team CI Bot <josh.powers+server-team-bot@canonical.com>2018-08-31 21:47:18 +0000
commit43e51a04515686a15c410d1a16dd5ff06fd1afd4 (patch)
tree4ad8fb33fc749669f7a1ef583cfe82979b207bb2
parent9c35f9762028b8bf15cdcd6b42c0fafc233ddda3 (diff)
downloadcloud-init-git-43e51a04515686a15c410d1a16dd5ff06fd1afd4.tar.gz
hyperv_reporting_handler: simplify threaded publisher
Switch the implementation to a daemon thread which uses a blocking get from the Queue. No additional locking or flag checking is needed since the Queue itself handles acquiring the lock as needed. cloud-init only has a single producer (the main thread calling publish) and the consumer will read all events in the queue and write them out. Using the daemon mode of the thread handles flushing the queue on main exit in python3; in python2.7 we handle the EOFError that results when the publish thread calls to get() fails indicating the main thread has exited. The result is that the handler is no longer spawing a thread on each publish event but rather creates a single thread when we start up the reporter and we remove any additional use of separate locks and flags as we only have a single Queue object and we're only calling queue.put() from main thread and queue.get() from consuming thread.
-rw-r--r--cloudinit/cmd/main.py4
-rw-r--r--cloudinit/reporting/__init__.py6
-rw-r--r--cloudinit/reporting/handlers.py49
3 files changed, 29 insertions, 30 deletions
diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py
index c0edee18..4ea4fe7f 100644
--- a/cloudinit/cmd/main.py
+++ b/cloudinit/cmd/main.py
@@ -877,9 +877,11 @@ def main(sysv_args=None):
rname, rdesc, reporting_enabled=report_on)
with args.reporter:
- return util.log_time(
+ retval = util.log_time(
logfunc=LOG.debug, msg="cloud-init mode '%s'" % name,
get_uptime=True, func=functor, args=(name, args))
+ reporting.flush_events()
+ return retval
if __name__ == '__main__':
diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
index e047767e..ed5c7038 100644
--- a/cloudinit/reporting/__init__.py
+++ b/cloudinit/reporting/__init__.py
@@ -37,6 +37,12 @@ def update_configuration(config):
instantiated_handler_registry.register_item(handler_name, instance)
+def flush_events():
+ for _, handler in instantiated_handler_registry.registered_items.items():
+ if hasattr(handler, 'flush'):
+ handler.flush()
+
+
instantiated_handler_registry = DictRegistry()
update_configuration(DEFAULT_CONFIG)
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 4b4bb396..6d23558e 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -16,10 +16,8 @@ from cloudinit import (url_helper, util)
from datetime import datetime
if six.PY2:
- import multiprocessing.queues as queue
from multiprocessing.queues import JoinableQueue as JQueue
else:
- import queue
from queue import Queue as JQueue
LOG = logging.getLogger(__name__)
@@ -41,6 +39,10 @@ class ReportingHandler(object):
def publish_event(self, event):
"""Publish an event."""
+ def flush(self):
+ """Ensure ReportingHandler has published all events"""
+ pass
+
class LogHandler(ReportingHandler):
"""Publishes events to the cloud-init log at the ``DEBUG`` log level."""
@@ -134,15 +136,16 @@ class HyperVKvpReportingHandler(ReportingHandler):
super(HyperVKvpReportingHandler, self).__init__()
self._kvp_file_path = kvp_file_path
self._event_types = event_types
- self.running = False
- self.queue_lock = threading.Lock()
- self.running_lock = threading.Lock()
self.q = JQueue()
self.kvp_file = None
self.incarnation_no = self._get_incarnation_no()
self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
self.incarnation_no)
self._current_offset = 0
+ self.publish_thread = threading.Thread(
+ target=self._publish_event_routine)
+ self.publish_thread.daemon = True
+ self.publish_thread.start()
def _get_incarnation_no(self):
"""
@@ -276,10 +279,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
def _publish_event_routine(self):
while True:
- event = None
try:
- # acquire the lock.
- event = self.q.get_nowait()
+ event = self.q.get(block=True)
need_append = True
try:
if not os.path.exists(self._kvp_file_path):
@@ -302,41 +303,31 @@ class HyperVKvpReportingHandler(ReportingHandler):
if int(match_groups[0]) < self.incarnation_no:
need_append = False
self._update_kvp_item(encoded_data)
- break
+ continue
if need_append:
self._append_kvp_item(encoded_data)
except IOError as e:
LOG.warning(
"failed posting event to kvp: %s e:%s",
event.as_string(), e)
- self.running = False
- break
finally:
self.q.task_done()
- except queue.Empty:
- with self.queue_lock:
- # double check the queue is empty
- if self.q.empty():
- self.running = False
- break
-
- def trigger_publish_event(self):
- if not self.running:
- with self.running_lock:
- if not self.running:
- self.running = True
- thread = threading.Thread(
- target=self._publish_event_routine)
- thread.start()
+
+ # when main process exits, q.get() will through EOFError
+ # indicating we should exit this thread.
+ except EOFError:
+ return
# since the saving to the kvp pool can be a time costing task
# if the kvp pool already contains a chunk of data,
# so defer it to another thread.
def publish_event(self, event):
if (not self._event_types or event.event_type in self._event_types):
- with self.queue_lock:
- self.q.put(event)
- self.trigger_publish_event()
+ self.q.put(event)
+
+ def flush(self):
+ LOG.debug('HyperVReportingHandler flushing remaining events')
+ self.q.join()
available_handlers = DictRegistry()