From 868373bfb40809d681a6981872f7af68e2aaadc8 Mon Sep 17 00:00:00 2001 From: Graham Dumpleton Date: Tue, 17 Jun 2014 16:19:40 +1000 Subject: Refactor to start splitting generic metrics collection from New Relic platform reporting. Also added new capabilities to monitor processes. --- src/metrics/newrelic/agent.py | 330 +++++++++++++++++---- src/metrics/newrelic/interface.py | 189 ------------ src/metrics/newrelic/platform.py | 189 ++++++++++++ src/metrics/newrelic/sampler.py | 604 -------------------------------------- src/metrics/sampler.py | 125 ++++++++ src/metrics/scoreboard.py | 568 +++++++++++++++++++++++++++++++++++ src/metrics/statistics.py | 89 ++++++ 7 files changed, 1249 insertions(+), 845 deletions(-) delete mode 100644 src/metrics/newrelic/interface.py create mode 100644 src/metrics/newrelic/platform.py delete mode 100644 src/metrics/newrelic/sampler.py create mode 100644 src/metrics/sampler.py create mode 100644 src/metrics/scoreboard.py create mode 100644 src/metrics/statistics.py diff --git a/src/metrics/newrelic/agent.py b/src/metrics/newrelic/agent.py index 2b4b74c..86ac388 100644 --- a/src/metrics/newrelic/agent.py +++ b/src/metrics/newrelic/agent.py @@ -1,5 +1,7 @@ import os import logging +import math +import psutil try: from ConfigParser import RawConfigParser, NoOptionError, NoSectionError @@ -8,79 +10,93 @@ except ImportError: import mod_wsgi -from .interface import Interface -from .sampler import Sampler +from .platform import Client +from ..sampler import Sampler +from ..statistics import Metrics, Stats _logger = logging.getLogger(__name__) -class Agent(object): +def configuration_settings(app_name=None, license_key=None, + config_file=None, environment=None): - def __init__(self, app_name=None, license_key=None, config_file=None, - environment=None): + if config_file is None: + config_file = os.environ.get('NEW_RELIC_CONFIG_FILE', None) - self.sampler = None + if config_file is not None: + config_object = RawConfigParser() - if mod_wsgi.version < (4, 2, 0): - _logger.fatal('Version 4.2.0 or newer of mod_wsgi is required ' - 'for running the New Relic platform plugin. The plugin ' - 'has been disabled.') + if config_file: + config_object.read([config_file]) - return + if environment is None: + environment = os.environ.get('NEW_RELIC_ENVIRONMENT', None) + + def _option(name, section='newrelic', type=None, **kwargs): + try: + getter = 'get%s' % (type or '') + return getattr(config_object, getter)(section, name) + except NoOptionError: + if 'default' in kwargs: + return kwargs['default'] + else: + raise - if config_file is None: - config_file = os.environ.get('NEW_RELIC_CONFIG_FILE', None) + def option(name, type=None, **kwargs): + sections = [] - if config_file is not None: - config_object = RawConfigParser() + if environment is not None: + sections.append('newrelic-platform:%s' % environment) - if config_file: - config_object.read([config_file]) + sections.append('newrelic-platform') - if environment is None: - environment = os.environ.get('NEW_RELIC_ENVIRONMENT', None) + if environment is not None: + sections.append('newrelic:%s' % environment) - def _option(name, section='newrelic', type=None, **kwargs): + sections.append('newrelic') + + for section in sections: try: - getter = 'get%s' % (type or '') - return getattr(config_object, getter)(section, name) - except NoOptionError: - if 'default' in kwargs: - return kwargs['default'] - else: - raise + return _option(name, section, type) + except (NoOptionError, NoSectionError): + pass - def option(name, type=None, **kwargs): - sections = [] + if 'default' in kwargs: + return kwargs['default'] - if environment is not None: - sections.append('newrelic-platform:%s' % environment) + if app_name is None: + app_name = os.environ.get('NEW_RELIC_APP_NAME', None) + app_name = option('app_name', default=app_name) - sections.append('newrelic-platform') + if license_key is None: + license_key = os.environ.get('NEW_RELIC_LICENSE_KEY', None) + license_key = option('license_key', default=license_key) - if environment is not None: - sections.append('newrelic:%s' % environment) + if app_name is not None: + app_name = app_name.split(';')[0].strip() - sections.append('newrelic') + return app_name, license_key - for section in sections: - try: - return _option(name, section, type) - except (NoOptionError, NoSectionError): - pass +class Agent(object): - if 'default' in kwargs: - return kwargs['default'] + guid = 'au.com.dscpl.wsgi.mod_wsgi' + version = '1.1.0' + + max_retries = 10 - if app_name is None: - app_name = os.environ.get('NEW_RELIC_APP_NAME', None) - app_name = option('app_name', default=app_name) + def __init__(self, sampler=None, app_name=None, license_key=None, + config_file=None, environment=None): - if license_key is None: - license_key = os.environ.get('NEW_RELIC_LICENSE_KEY', None) - license_key = option('license_key', default=license_key) + self.sampler = None - if app_name is not None: - app_name = app_name.split(';')[0].strip() + if mod_wsgi.version < (4, 2, 0): + _logger.fatal('Version 4.2.0 or newer of mod_wsgi is required ' + 'for running the New Relic platform plugin. The plugin ' + 'has been disabled.') + + return + + app_name, license_key = configuration_settings(app_name, + license_key, config_file, environment) if not license_key or not app_name: _logger.fatal('Either the license key or application name was ' @@ -91,8 +107,218 @@ class Agent(object): _logger.info('New Relic platform plugin reporting to %r.', app_name) - self.interface = Interface(license_key) - self.sampler = Sampler(self.interface, app_name) + self.client = Client(license_key) + + self.license_key = license_key + self.app_name = app_name + + self.sampler = sampler or Sampler() + + self.sampler.register(self.process) + + self.metrics = Metrics() + self.epoch = None + self.retries = 0 + + def upload(self, metrics, duration): + try: + self.client.send_metrics(self.app_name, self.guid, self.version, + duration, metrics) + + except self.client.RetryDataForRequest: + return True + + except Exception: + pass + + return False + + def record(self, name, value): + name = 'Component/' + name + self.metrics.merge_value(name, value) + + def rollover(self): + self.metrics = Metrics() + self.epoch = None + self.retries = 0 + + def process(self, scoreboard): + # Record metric to track how many Apache server instances are + # reporting. The 'Server/Instances' metric should be charted as + # a 'Count', rounded to an 'Integer'. + + self.record('Server/Instances[|servers]', 0) + + # If this is the first sampling period, take that to mean that + # this is a new process and Apache was just (re)started. If we + # are being told the sampler is exiting, we take it that Apache + # is being shutdown. Both can show up if shutdown during the + # first sampling period. The 'Server/Lifecycle' metrics should + # be charted as a 'Count', rounded to an 'Integer'. + + if scoreboard.sample_periods == 1: + self.record('Server/Lifecycle/Starting[|servers]', 0) + + if scoreboard.sampler_exiting: + self.record('Server/Lifecycle/Stopping[|servers]', 0) + + # Record metric to track how many processes are in use. This is + # calculated as an average from the total number which which + # were reported in use in each individual sample. The + # 'Process/Instances' metric should be charted as a 'Count', + # rounded to an 'Integer'. + + self.record('Processes/Instances[|processes]', Stats( + count=scoreboard.processes_running)) + + # Also separately record how many processes were counted as + # having been started or stopped in the sampling period. These + # would be used to represent the amount of process churn which + # is occuring due to Apache's dynamic management of the number + # of processes. The 'Process/Lifecycle' metrics should be + # charted as a 'Count', rounded to an 'Integer'. + + self.record('Processes/Lifecycle/Starting[|processes]', + Stats(count=scoreboard.processes_started_count)) + + self.record('Processes/Lifecycle/Stopping[|processes]', + Stats(count=scoreboard.processes_stopped_count)) + + # Record metric to track how many workers are in idle and busy + # states. This is calculated as an average from the total number + # which were reported in each state in each individual sample. + # The 'Workers/Availability' metrics should be charted as a + # 'Count', rounded to an 'Integer'. + + self.record('Workers/Availability/Idle[|workers]', Stats( + count=scoreboard.workers_idle)) + self.record('Workers/Availability/Busy[|workers]', Stats( + count=scoreboard.workers_busy)) + + # Record metric to track more fine grained status of each + # worker. This is calculated as an average from the total number + # which were reported in each state in each individual sample. + # The 'Workers/Status' metrics should be charted as 'Average' + # value, rounded to an 'Integer'. + + for label, value in scoreboard.workers_status.items(): + self.record('Workers/Status/%s[workers]' % label, value) + + # Record metric to track the utilisation of the server. The + # 'Workers/Utilization' metric should be charted as 'Average + # value', with number format of 'Percentage'. + + self.record('Workers/Utilization[server]', + scoreboard.workers_utilization) + + # Record metric to track the request throughput. The + # 'Requests/Throughput' metric should be charted as 'Throughput'. + + self.record('Requests/Throughput[|requests]', Stats( + count=scoreboard.access_count_delta, + total=scoreboard.access_count_delta)) + + # Record metric to track number of bytes served up. This is + # believed only to be from response content. There is no known + # separate measure for bytes uploaded. The 'Requests/Bytes Served' + # should be charted as 'Rate'. + + self.record('Requests/Bytes Served[bytes]', + scoreboard.bytes_served_delta) + + # Record metric to track request response time. This is + # calculated as an average from the request samples. That is, it + # is not across all requests. The 'Requests/Response Time' + # metric should be charted as 'Average'. + + for request in scoreboard.request_samples: + self.record('Requests/Response Time[seconds|request]', + request.duration) + + # Record metric to track percentile breakdown of request + # response time. That is, it is not across all requests. The + # 'Requests/Percentiles' metric should be charted as 'Average'. + + for label, value in scoreboard.request_percentiles.items(): + self.record('Requests/Percentiles/%s[seconds]' % label, value) + + # Record metric to track what percentage of all requests were + # captured as samples. The 'Requests/Sample Quality' metric + # should be charted as 'Average' converted to a 'Percentage'. + + self.record('Requests/Sample Quality[requests]', + scoreboard.request_samples_quality) + + user_time = 0.0 + system_time = 0.0 + + memory_rss = 0 + + for process in scoreboard.processes_system_info.values(): + user_time += process['cpu_user_time'] + system_time += process['cpu_system_time'] + memory_rss += process['memory_rss'] + + # Record metric to track memory usage by processes. The + # 'Processes/Memory/Physical' metric should be charted as + # 'Average'. + + self.record('Processes/Memory/Physical[bytes]', + process['memory_rss']) + + # Record metrics to track the number of context switches. + # The 'Processes/Context Switches' metrics should be charted + # as 'Rate'. + + self.record('Processes/Context Switches/Voluntary[context]', + process['ctx_switch_voluntary']) + self.record('Processes/Context Switches/Involuntary[context]', + process['ctx_switch_involuntary']) + + # Record metric to track combined memory usage of whole server. + # The 'Server/Memory/Physical' metric should be charted as + # 'Average'. + + self.record('Server/Memory/Physical[bytes]', memory_rss) + + # Record metric to track the CPU usage for user and system. The + # 'Processes/CPU Usage' metric should be charted as 'Rate'. + + self.record('Processes/CPU Usage[cpu]', user_time + system_time) + self.record('Processes/CPU Usage/User[cpu]', user_time) + self.record('Processes/CPU Usage/System[cpu]', system_time) + + # Now attempt to upload the metric data to New Relic. Make sure + # we don't try and upload data from too short of a sampling + # period as it will be rejected anyway. Retain any which is too + # short so it is merged with subsequent sampling period. + + if self.epoch is not None: + duration = scoreboard.period_end - self.epoch + else: + duration = scoreboard.duration + + if duration > 1.0: + retry = self.upload(self.metrics.metrics, duration) + else: + retry = True + + # If a failure occurred but the failure type was such that we + # could try again to upload the data, then retain the metrics + # for next time. If we have two many failed attempts though we + # give up. + + if retry: + self.retries += 1 + + if self.retries == self.max_retries: + self.rollover() + + elif self.epoch is None: + self.epoch = scoreboard.period_start + + else: + self.rollover() def start(self): if self.sampler is not None: diff --git a/src/metrics/newrelic/interface.py b/src/metrics/newrelic/interface.py deleted file mode 100644 index a9db15f..0000000 --- a/src/metrics/newrelic/interface.py +++ /dev/null @@ -1,189 +0,0 @@ -import zlib -import sys -import socket -import os -import types -import json -import logging - -try: - import http.client as httplib -except ImportError: - import httplib - -_logger = logging.getLogger(__name__) - -# Python 3 compatibility helpers. - -PY2 = sys.version_info[0] == 2 -PY3 = sys.version_info[0] == 3 - -if PY3: - def b(s): - return s.encode('latin-1') -else: - def b(s): - return s - -# Helpers for json encoding and decoding. - -def json_encode(obj, **kwargs): - _kwargs = {} - - if type(b'') is type(''): - _kwargs['encoding'] = 'latin-1' - - def _encode(o): - if isinstance(o, bytes): - return o.decode('latin-1') - elif isinstance(o, types.GeneratorType): - return list(o) - elif hasattr(o, '__iter__'): - return list(iter(o)) - raise TypeError(repr(o) + ' is not JSON serializable') - - _kwargs['default'] = _encode - _kwargs['separators'] = (',', ':') - - _kwargs.update(kwargs) - - return json.dumps(obj, **_kwargs) - -def json_decode(s, **kwargs): - return json.loads(s, **kwargs) - -# Platform plugin interface. - -class Interface(object): - - class NetworkInterfaceException(Exception): pass - class DiscardDataForRequest(NetworkInterfaceException): pass - class RetryDataForRequest(NetworkInterfaceException): pass - class ServerIsUnavailable(RetryDataForRequest): pass - - USER_AGENT = 'ModWsgi-PythonPlugin/%s (Python %s %s)' % ( - '1.0.0', sys.version.split()[0], sys.platform) - - HOST = 'platform-api.newrelic.com' - URL = '/platform/v1/metrics' - - def __init__(self, license_key): - self.license_key = license_key - - def send_request(self, payload=()): - headers = {} - config = {} - - license_key = self.license_key - - if not self.license_key: - license_key = 'INVALID LICENSE KEY' - - headers['User-Agent'] = self.USER_AGENT - headers['Content-Encoding'] = 'identity' - headers['X-License-Key'] = license_key - - try: - data = json_encode(payload) - - except Exception as exc: - _logger.exception('Error encoding data for JSON payload ' - 'with payload of %r.', payload) - - raise Interface.DiscardDataForRequest(str(exc)) - - if len(data) > 64*1024: - headers['Content-Encoding'] = 'deflate' - level = (len(data) < 2000000) and 1 or 9 - data = zlib.compress(b(data), level) - - try: - connection = httplib.HTTPSConnection(self.HOST, timeout=30.0) - connection.request('POST', self.URL, data, headers) - response = connection.getresponse() - content = response.read() - - except httplib.HTTPException as exc: - raise Interface.RetryDataForRequest(str(exc)) - - finally: - connection.close() - - if response.status != 200: - _logger.debug('Received a non 200 HTTP response from the data ' - 'collector where headers=%r, status=%r and content=%r.', - headers, response.status, content) - - if response.status == 400: - if headers['Content-Encoding'] == 'deflate': - data = zlib.decompress(data) - - _logger.error('Data collector is indicating that a bad ' - 'request has been submitted for headers of %r and ' - 'payload of %r with response of %r.', headers, data, - content) - - raise Interface.DiscardDataForRequest() - - elif response.status == 403: - _logger.error('Data collector is indicating that the license ' - 'key %r is not valid.', license_key) - - raise Interface.DiscardDataForRequest() - - elif response.status == 413: - _logger.warning('Data collector is indicating that a request ' - 'was received where the request content size was over ' - 'the maximum allowed size limit. The length of the ' - 'request content was %d.', len(data)) - - raise Interface.DiscardDataForRequest() - - elif response.status in (503, 504): - _logger.warning('Data collector is unavailable.') - - raise Interface.ServerIsUnavailable() - - elif response.status != 200: - _logger.warning('An unexpected HTTP response was received ' - 'from the data collector of %r. The payload for ' - 'the request was %r.', respnse.status, payload) - - raise Interface.DiscardDataForRequest() - - try: - if PY3: - content = content.decode('UTF-8') - - result = json_decode(content) - - except Exception as exc: - _logger.exception('Error decoding data for JSON payload ' - 'with payload of %r.', content) - - raise Interface.DiscardDataForRequest(str(exc)) - - if 'status' in result: - return result['status'] - - error_message = result['error'] - - raise Interface.DiscardDataForRequest(error_message) - - def send_metrics(self, name, guid, version, duration, metrics): - agent = {} - agent['host'] = socket.gethostname() - agent['pid'] = os.getpid() - agent['version'] = version or '0.0.0.' - - component = {} - component['name'] = name - component['guid'] = guid - component['duration'] = duration - component['metrics'] = metrics - - payload = {} - payload['agent'] = agent - payload['components'] = [component] - - return self.send_request(payload) diff --git a/src/metrics/newrelic/platform.py b/src/metrics/newrelic/platform.py new file mode 100644 index 0000000..302bac0 --- /dev/null +++ b/src/metrics/newrelic/platform.py @@ -0,0 +1,189 @@ +import zlib +import sys +import socket +import os +import types +import json +import logging + +try: + import http.client as httplib +except ImportError: + import httplib + +_logger = logging.getLogger(__name__) + +# Python 3 compatibility helpers. + +PY2 = sys.version_info[0] == 2 +PY3 = sys.version_info[0] == 3 + +if PY3: + def b(s): + return s.encode('latin-1') +else: + def b(s): + return s + +# Helpers for json encoding and decoding. + +def json_encode(obj, **kwargs): + _kwargs = {} + + if type(b'') is type(''): + _kwargs['encoding'] = 'latin-1' + + def _encode(o): + if isinstance(o, bytes): + return o.decode('latin-1') + elif isinstance(o, types.GeneratorType): + return list(o) + elif hasattr(o, '__iter__'): + return list(iter(o)) + raise TypeError(repr(o) + ' is not JSON serializable') + + _kwargs['default'] = _encode + _kwargs['separators'] = (',', ':') + + _kwargs.update(kwargs) + + return json.dumps(obj, **_kwargs) + +def json_decode(s, **kwargs): + return json.loads(s, **kwargs) + +# Platform plugin client. + +class Client(object): + + class NetworkInterfaceException(Exception): pass + class DiscardDataForRequest(NetworkInterfaceException): pass + class RetryDataForRequest(NetworkInterfaceException): pass + class ServerIsUnavailable(RetryDataForRequest): pass + + USER_AGENT = 'ModWsgi-PythonPlugin/%s (Python %s %s)' % ( + '1.0.0', sys.version.split()[0], sys.platform) + + HOST = 'platform-api.newrelic.com' + URL = '/platform/v1/metrics' + + def __init__(self, license_key): + self.license_key = license_key + + def send_request(self, payload=()): + headers = {} + config = {} + + license_key = self.license_key + + if not self.license_key: + license_key = 'INVALID LICENSE KEY' + + headers['User-Agent'] = self.USER_AGENT + headers['Content-Encoding'] = 'identity' + headers['X-License-Key'] = license_key + + try: + data = json_encode(payload) + + except Exception as exc: + _logger.exception('Error encoding data for JSON payload ' + 'with payload of %r.', payload) + + raise Client.DiscardDataForRequest(str(exc)) + + if len(data) > 64*1024: + headers['Content-Encoding'] = 'deflate' + level = (len(data) < 2000000) and 1 or 9 + data = zlib.compress(b(data), level) + + try: + connection = httplib.HTTPSConnection(self.HOST, timeout=30.0) + connection.request('POST', self.URL, data, headers) + response = connection.getresponse() + content = response.read() + + except httplib.HTTPException as exc: + raise Client.RetryDataForRequest(str(exc)) + + finally: + connection.close() + + if response.status != 200: + _logger.debug('Received a non 200 HTTP response from the data ' + 'collector where headers=%r, status=%r and content=%r.', + headers, response.status, content) + + if response.status == 400: + if headers['Content-Encoding'] == 'deflate': + data = zlib.decompress(data) + + _logger.error('Data collector is indicating that a bad ' + 'request has been submitted for headers of %r and ' + 'payload of %r with response of %r.', headers, data, + content) + + raise Client.DiscardDataForRequest() + + elif response.status == 403: + _logger.error('Data collector is indicating that the license ' + 'key %r is not valid.', license_key) + + raise Client.DiscardDataForRequest() + + elif response.status == 413: + _logger.warning('Data collector is indicating that a request ' + 'was received where the request content size was over ' + 'the maximum allowed size limit. The length of the ' + 'request content was %d.', len(data)) + + raise Client.DiscardDataForRequest() + + elif response.status in (503, 504): + _logger.warning('Data collector is unavailable.') + + raise Client.ServerIsUnavailable() + + elif response.status != 200: + _logger.warning('An unexpected HTTP response was received ' + 'from the data collector of %r. The payload for ' + 'the request was %r.', respnse.status, payload) + + raise Client.DiscardDataForRequest() + + try: + if PY3: + content = content.decode('UTF-8') + + result = json_decode(content) + + except Exception as exc: + _logger.exception('Error decoding data for JSON payload ' + 'with payload of %r.', content) + + raise Client.DiscardDataForRequest(str(exc)) + + if 'status' in result: + return result['status'] + + error_message = result['error'] + + raise Client.DiscardDataForRequest(error_message) + + def send_metrics(self, name, guid, version, duration, metrics): + agent = {} + agent['host'] = socket.gethostname() + agent['pid'] = os.getpid() + agent['version'] = version or '0.0.0.' + + component = {} + component['name'] = name + component['guid'] = guid + component['duration'] = duration + component['metrics'] = metrics + + payload = {} + payload['agent'] = agent + payload['components'] = [component] + + return self.send_request(payload) diff --git a/src/metrics/newrelic/sampler.py b/src/metrics/newrelic/sampler.py deleted file mode 100644 index ab2e767..0000000 --- a/src/metrics/newrelic/sampler.py +++ /dev/null @@ -1,604 +0,0 @@ -import threading -import atexit -import os -import sys -import json -import socket -import time -import math - -try: - import Queue as queue -except ImportError: - import queue - -import mod_wsgi - -SERVER_READY = '_' -SERVER_STARTING = 'S' -SERVER_BUSY_READ = 'R' -SERVER_BUSY_WRITE = 'W' -SERVER_BUST_KEEPALIVE = 'K' -SERVER_BUSY_LOG = 'L' -SERVER_BUSY_DNS = 'D' -SERVER_CLOSING = 'C' -SERVER_GRACEFUL = 'G' -SERVER_IDLE_KILL = 'I' -SERVER_DEAD = '.' - -STATUS_FLAGS = { - SERVER_READY: 'Ready', - SERVER_STARTING: 'Starting', - SERVER_BUSY_READ: 'Read', - SERVER_BUSY_WRITE: 'Write', - SERVER_BUST_KEEPALIVE: 'Keepalive', - SERVER_BUSY_LOG: 'Logging', - SERVER_BUSY_DNS: 'DNS lookup', - SERVER_CLOSING: 'Closing', - SERVER_GRACEFUL: 'Graceful', - SERVER_IDLE_KILL: 'Dying', - SERVER_DEAD: 'Dead' -} - -class Sample(dict): - - def __init__(self, count=0, total=0.0, min=0.0, max=0.0, - sum_of_squares=0.0): - self.count = count - self.total = total - self.min = min - self.max = max - self.sum_of_squares = sum_of_squares - - def __setattr__(self, name, value): - self[name] = value - - def __getattr__(self, name): - return self[name] - - def merge_stats(self, other): - self.total += other.total - self.min = self.count and min(self.min, other.min) or other.min - self.max = max(self.max, other.max) - self.sum_of_squares += other.sum_of_squares - self.count += other.count - - def merge_value(self, value): - self.total += value - self.min = self.count and min(self.min, value) or value - self.max = max(self.max, value) - self.sum_of_squares += value ** 2 - self.count += 1 - -class Samples(object): - - def __init__(self): - self.samples = {} - - def __iter__(self): - return iter(self.samples.items()) - - def sample_name(self, name): - return 'Component/' + name - - def _assign_value(self, value): - if isinstance(value, Sample): - sample = value - self.samples[name] = sample - else: - sample = Sample() - self.samples[name] = sample - sample.merge_value(value) - - return sample - - def assign_value(self, value): - name = self.sample_name(name) - - return self._assign_value(name) - - def _merge_value(self, name, value): - sample = self.samples.get(name) - - if sample is None: - sample = Sample() - self.samples[name] = sample - - if isinstance(value, Sample): - sample.merge_stats(value) - else: - sample.merge_value(value) - - return sample - - def merge_value(self, name, value): - name = self.sample_name(name) - - return self._merge_value(name, value) - - def fetch_sample(self, name): - name = self.sample_name(name) - - sample = self.samples.get(name) - - if sample is None: - sample = Sample() - self.samples[name] = sample - - return sample - - def merge_samples(self, samples): - for name, sample in samples: - self._merge_value(name, sample) - - def assign_samples(self, samples): - for name, sample in samples: - self._assign_value(name, sample) - - def clear_samples(self): - self.samples.clear() - -class Sampler(object): - - guid = 'au.com.dscpl.wsgi.mod_wsgi' - version = '1.0.0' - - def __init__(self, interface, name): - self.interface = interface - self.name = name - - self.running = False - self.lock = threading.Lock() - - self.period_start = 0 - self.access_count = 0 - self.bytes_served = 0 - - self.request_samples = [] - - self.metric_data = Samples() - - self.report_queue = queue.Queue() - - self.report_thread = threading.Thread(target=self.report_main_loop) - self.report_thread.setDaemon(True) - - self.report_start = 0 - self.report_metrics = Samples() - - self.monitor_queue = queue.Queue() - - self.monitor_thread = threading.Thread(target=self.monitor_main_loop) - self.monitor_thread.setDaemon(True) - - self.monitor_count = 0 - - def upload_report(self, start, end, metrics): - try: - self.interface.send_metrics(self.name, self.guid, self.version, - end-start, metrics.samples) - - except self.interface.RetryDataForRequest: - return True - - except Exception: - pass - - return False - - def generate_request_metrics(self, harvest_data): - metrics = Samples() - - # Chart as 'Throughput'. - - metrics.merge_value('Requests/Throughput[|requests]', - Sample(count=harvest_data['access_count'], - total=harvest_data['access_count'])) - - # Calculate from the set of sampled requests the average - # and percentile metrics. - - requests = harvest_data['request_samples'] - - if requests: - for request in requests: - # Chart as 'Average'. - - metrics.merge_value('Requests/Response Time[seconds|request]', - request['duration']) - - requests.sort(key=lambda e: e['duration']) - - total = sum([x['duration'] for x in requests]) - - # Chart as 'Average'. - - metrics.merge_value('Requests/Percentiles/Average[seconds]', - total/len(requests)) - - idx50 = int(0.50 * len(requests)) - metrics.merge_value('Requests/Percentiles/Median[seconds]', - requests[idx50]['duration']) - - idx95 = int(0.95 * len(requests)) - metrics.merge_value('Requests/Percentiles/95%[seconds]', - requests[idx95]['duration']) - - idx99 = int(0.99 * len(requests)) - metrics.merge_value('Requests/Percentiles/99%[seconds]', - requests[idx99]['duration']) - - # Chart as 'Rate'. - - metrics.merge_value('Requests/Bytes Served[bytes]', - harvest_data['bytes_served']) - - return metrics - - def generate_process_metrics(self, harvest_data): - metrics = Samples() - - # Chart as 'Count'. Round to Integer. - - metrics.merge_value('Processes/Instances[|processes]', - Sample(count=math.ceil(float( - harvest_data['processes_running']) / - harvest_data['sample_count']))) - - metrics.merge_value('Processes/Lifecycle/Starting[|processes]', - Sample(count=harvest_data['processes_started'])) - - metrics.merge_value('Processes/Lifecycle/Stopping[|processes]', - Sample(count=harvest_data['processes_stopped'])) - - metrics.merge_value('Workers/Availability/Idle[|workers]', - Sample(count=math.ceil(float( - harvest_data['idle_workers']) / - harvest_data['sample_count']))) - metrics.merge_value('Workers/Availability/Busy[|workers]', - Sample(count=math.ceil(float( - harvest_data['busy_workers']) / - harvest_data['sample_count']))) - - # Chart as 'Percentage'. - - metrics.merge_value('Workers/Utilization[server]', - (float(harvest_data['busy_workers']) / - harvest_data['sample_count']) / ( - harvest_data['server_limit']*harvest_data['thread_limit'])) - - total = 0 - for value in harvest_data['worker_status'].values(): - value = float(value)/harvest_data['sample_count'] - total += value - - if total: - for key, value in harvest_data['worker_status'].items(): - if key != SERVER_DEAD and value != 0: - label = STATUS_FLAGS.get(key, 'Unknown') - - # Chart as 'Average'. Round to Integer. - - value = float(value)/harvest_data['sample_count'] - - metrics.merge_value('Workers/Status/%s[workers]' % - label, (value/total)*total) - - return metrics - - def report_main_loop(self): - # We need a set of cached metrics for the case where - # we fail in uploading the metric data and need to - # retain it for the next attempt to upload data. - - retries = 0 - retained_start = 0 - retained = Samples() - - # We simply wait to be passed the metric data to be - # reported for the current sample period. - - while True: - harvest_data = self.report_queue.get() - - # If samples is None then we are being told to - # exit as the process is being shutdown. Otherwise - # we should be passed the cumulative metric data - # and the set of sampled requests. - - if harvest_data is None: - return - - start = harvest_data['period_start'] - end = harvest_data['period_end'] - - metrics = harvest_data['metrics'] - - # Add metric to track how many Apache server instances - # are reporting for each sample period. - - # Chart as 'Count'. Round to Integer. - - metrics.merge_value('Server/Instances[|servers]', 0) - - # Generate percentiles metrics for request samples. - - metrics.merge_samples(self.generate_request_metrics(harvest_data)) - metrics.merge_samples(self.generate_process_metrics(harvest_data)) - - # If we had metrics from a previous reporting period - # because we couldn't upload the metric data, we need - # to merge the data from the current reporting period - # with that for the previous period. - - if retained.samples: - start = retained_start - retained.merge_samples(metrics) - metrics = retained - - # Now attempt to upload the metric data. - - retry = self.upload_report(start, end, metrics) - - # If a failure occurred but failure type was such that we - # could try again to upload the data, then retain them. If - # have two many failed attempts though we give up. - - if retry: - retries += 1 - - if retries == 5: - retries = 0 - - else: - retained = metrics - - else: - retries = 0 - - if retries == 0: - retained_start = 0 - retained.clear_samples() - - else: - retained_start = start - retained = metrics - - def generate_scoreboard(self, sample_start=None): - busy_workers = 0 - idle_workers = 0 - access_count = 0 - bytes_served = 0 - - active_processes = 0 - - scoreboard = mod_wsgi.server_metrics() - - if sample_start is None: - sample_start = scoreboard['current_time'] - - scoreboard['request_samples'] = request_samples = [] - - for process in scoreboard['processes']: - process['active_workers'] = 0 - - for worker in process['workers']: - status = worker['status'] - - if not process['quiescing'] and process['pid']: - if (status == SERVER_READY and process['generation'] == - scoreboard['running_generation']): - - process['active_workers'] += 1 - idle_workers += 1 - - elif status not in (SERVER_DEAD, SERVER_STARTING, - SERVER_IDLE_KILL): - - process['active_workers'] += 1 - busy_workers += 1 - - count = worker['access_count'] - - if count or status not in (SERVER_READY, SERVER_DEAD): - access_count += count - bytes_served += worker['bytes_served'] - - current_time = scoreboard['current_time'] - - start_time = worker['start_time'] - stop_time = worker['stop_time'] - - if (stop_time > start_time and sample_start < stop_time - and stop_time <= current_time): - - duration = stop_time - start_time - thread_num = worker['thread_num'] - - request_samples.append(dict(start_time=start_time, - duration=duration, thread_num=thread_num)) - - if process['active_workers']: - active_processes += 1 - - scoreboard['busy_workers'] = busy_workers - scoreboard['idle_workers'] = idle_workers - scoreboard['access_count'] = access_count - scoreboard['bytes_served'] = bytes_served - - scoreboard['active_processes'] = active_processes - - return scoreboard - - def record_process_statistics(self, scoreboard, harvest_data): - current_active_processes = scoreboard['active_processes'] - previous_active_processes = harvest_data['active_processes'] - - harvest_data['active_processes'] = current_active_processes - harvest_data['processes_running'] += current_active_processes - - if current_active_processes > previous_active_processes: - harvest_data['processes_started'] += (current_active_processes - - previous_active_processes) - - elif current_active_processes < previous_active_processes: - harvest_data['processes_stopped'] += (previous_active_processes - - current_active_processes) - - harvest_data['idle_workers'] += scoreboard['idle_workers'] - harvest_data['busy_workers'] += scoreboard['busy_workers'] - - for process in scoreboard['processes']: - for worker in process['workers']: - harvest_data['worker_status'][worker['status']] += 1 - - def monitor_main_loop(self): - scoreboard = self.generate_scoreboard() - - harvest_start = scoreboard['current_time'] - sample_start = harvest_start - sample_duration = 0.0 - - access_count = scoreboard['access_count'] - bytes_served = scoreboard['bytes_served'] - - harvest_data = {} - - harvest_data['sample_count'] = 0 - harvest_data['period_start'] = harvest_start - - harvest_data['metrics'] = Samples() - - harvest_data['request_samples'] = [] - - harvest_data['active_processes'] = 0 - - harvest_data['processes_running'] = 0 - harvest_data['processes_started'] = 0 - harvest_data['processes_stopped'] = 0 - - harvest_data['idle_workers'] = 0 - harvest_data['busy_workers'] = 0 - - harvest_data['server_limit'] = scoreboard['server_limit'] - harvest_data['thread_limit'] = scoreboard['thread_limit'] - - harvest_data['worker_status'] = {} - - for status in STATUS_FLAGS.keys(): - harvest_data['worker_status'][status] = 0 - - harvest_data['access_count'] = 0 - harvest_data['bytes_served'] = 0 - - # Chart as 'Count'. Round to Integer. - - harvest_data['metrics'].merge_value('Server/Restarts[|servers]', 0) - - start = time.time() - end = start + 60.0 - - while True: - try: - # We want to collect metrics on a regular second - # interval so we need to align the timeout value. - - now = time.time() - start += 1.0 - timeout = start - now - - return self.monitor_queue.get(timeout=timeout) - - except queue.Empty: - pass - - harvest_data['sample_count'] += 1 - - scoreboard = self.generate_scoreboard(sample_start) - - harvest_end = scoreboard['current_time'] - sample_end = harvest_end - - sample_duration = sample_end - sample_start - - self.record_process_statistics(scoreboard, harvest_data) - - harvest_data['request_samples'].extend( - scoreboard['request_samples']) - - access_count_delta = scoreboard['access_count'] - access_count_delta -= access_count - access_count = scoreboard['access_count'] - - harvest_data['access_count'] += access_count_delta - - bytes_served_delta = scoreboard['bytes_served'] - bytes_served_delta -= bytes_served - bytes_served = scoreboard['bytes_served'] - - harvest_data['bytes_served'] += bytes_served_delta - - now = time.time() - - if now >= end: - harvest_data['period_end'] = harvest_end - - self.report_queue.put(harvest_data) - - harvest_start = harvest_end - end += 60.0 - - _harvest_data = {} - - _harvest_data['sample_count'] = 0 - _harvest_data['period_start'] = harvest_start - - _harvest_data['metrics'] = Samples() - - _harvest_data['request_samples'] = [] - - _harvest_data['active_processes'] = ( - harvest_data['active_processes']) - - _harvest_data['processes_running'] = 0 - _harvest_data['processes_started'] = 0 - _harvest_data['processes_stopped'] = 0 - - _harvest_data['idle_workers'] = 0 - _harvest_data['busy_workers'] = 0 - - _harvest_data['server_limit'] = scoreboard['server_limit'] - _harvest_data['thread_limit'] = scoreboard['thread_limit'] - - _harvest_data['worker_status'] = {} - - for status in STATUS_FLAGS.keys(): - _harvest_data['worker_status'][status] = 0 - - _harvest_data['access_count'] = 0 - _harvest_data['bytes_served'] = 0 - - harvest_data = _harvest_data - - sample_start = sample_end - - def terminate(self): - try: - self.report_queue.put(None) - self.monitor_queue.put(None) - except Exception: - pass - - self.monitor_thread.join() - self.report_thread.join() - - def start(self): - if mod_wsgi.server_metrics() is None: - return - - with self.lock: - if not self.running: - self.running = True - atexit.register(self.terminate) - self.monitor_thread.start() - self.report_thread.start() diff --git a/src/metrics/sampler.py b/src/metrics/sampler.py new file mode 100644 index 0000000..754b2b0 --- /dev/null +++ b/src/metrics/sampler.py @@ -0,0 +1,125 @@ +import threading +import atexit +import time + +try: + import Queue as queue +except ImportError: + import queue + +from .scoreboard import Scoreboard + +import mod_wsgi + +class Sampler(object): + + sample_interval = 1.0 + report_interval = 60.0 + + def __init__(self): + self.running = False + self.lock = threading.Lock() + + self.sampler_queue = queue.Queue() + + self.sampler_thread = threading.Thread(target=self.sampler_loop) + self.sampler_thread.setDaemon(True) + + self.consumer_queue = queue.Queue() + + self.consumer_thread = threading.Thread(target=self.consumer_loop) + self.consumer_thread.setDaemon(True) + + self.consumers = [] + + def register(self, callback): + self.consumers.append(callback) + + def consumer_loop(self): + while True: + scoreboard = self.consumer_queue.get() + + for consumer in self.consumers: + consumer(scoreboard) + + if scoreboard.sampler_exiting: + return + + def distribute(self, scoreboard): + self.consumer_queue.put(scoreboard) + + def sampler_loop(self): + scoreboard = Scoreboard() + + scheduled_time = time.time() + period_end_time = scheduled_time + self.report_interval + + while True: + try: + # We want to collect metrics on a regular second + # interval so we need to align the timeout value. + + now = time.time() + scheduled_time += self.sample_interval + timeout = max(0, scheduled_time - now) + + self.sampler_queue.get(timeout=timeout) + + # If we get here we have been notified to exit. + # We update the scoreboard one last time and then + # distribute it to any consumers. + + scoreboard.update(rollover=True, exiting=True) + + self.distribute(scoreboard) + + return + + except queue.Empty: + pass + + # Update the scoreboard for the current sampling period. + # Need to check first whether after we will be rolling it + # over for next sampling period as well so can do any + # special end of sampling period actions. + + now = time.time() + + if now >= period_end_time: + scoreboard.update(rollover=True) + + # Distribute scoreboard to any consumers. It + # is expected that they will read but not update + # as same instance is used for all. + + self.distribute(scoreboard) + + period_end_time += self.report_interval + + # Rollover to a new scoreboard for the next + # sampling period. + + scoreboard = scoreboard.rollover() + + else: + scoreboard.update(rollover=False) + + def terminate(self): + try: + self.sampler_queue.put(None) + except Exception: + pass + + self.sampler_thread.join() + self.consumer_thread.join() + + def start(self): + if mod_wsgi.server_metrics() is None: + return + + with self.lock: + if not self.running: + self.running = True + atexit.register(self.terminate) + self.sampler_thread.start() + self.consumer_thread.start() diff --git a/src/metrics/scoreboard.py b/src/metrics/scoreboard.py new file mode 100644 index 0000000..df37ac9 --- /dev/null +++ b/src/metrics/scoreboard.py @@ -0,0 +1,568 @@ +import copy +import math +import psutil + +from collections import namedtuple + +from mod_wsgi import server_metrics as raw_server_metrics + +SERVER_READY = '_' +SERVER_STARTING = 'S' +SERVER_BUSY_READ = 'R' +SERVER_BUSY_WRITE = 'W' +SERVER_BUST_KEEPALIVE = 'K' +SERVER_BUSY_LOG = 'L' +SERVER_BUSY_DNS = 'D' +SERVER_CLOSING = 'C' +SERVER_GRACEFUL = 'G' +SERVER_IDLE_KILL = 'I' +SERVER_DEAD = '.' + +WORKER_STATUS = { + SERVER_READY: 'Ready', + SERVER_STARTING: 'Starting', + SERVER_BUSY_READ: 'Read', + SERVER_BUSY_WRITE: 'Write', + SERVER_BUST_KEEPALIVE: 'Keepalive', + SERVER_BUSY_LOG: 'Logging', + SERVER_BUSY_DNS: 'DNS lookup', + SERVER_CLOSING: 'Closing', + SERVER_GRACEFUL: 'Graceful', + SERVER_IDLE_KILL: 'Dying', + SERVER_DEAD: 'Dead' +} + +def server_metrics(): + """Returns server metrics, which are a combination of data from the + raw mod_wsgi server metrics, along with further data derived from + that raw data. + + """ + + workers_busy = 0 + workers_idle = 0 + + access_count = 0 + bytes_served = 0 + + active_processes = 0 + + # Grab the raw server metrics. + + result = raw_server_metrics() + + # Loop over all the processes and workers they contain aggregating + # various details. + + for process in result['processes']: + process['active_workers'] = 0 + + for worker in process['workers']: + # Here we determine whether a worker is busy or idle. + + status = worker['status'] + + if not process['quiescing'] and process['pid']: + if (status == SERVER_READY and process['generation'] == + result['running_generation']): + + process['active_workers'] += 1 + workers_idle += 1 + + elif status not in (SERVER_DEAD, SERVER_STARTING, + SERVER_IDLE_KILL): + + process['active_workers'] += 1 + workers_busy += 1 + + # Here we aggregate number of requests served and + # amount of bytes transferred. + + count = worker['access_count'] + + if count or status not in (SERVER_READY, SERVER_DEAD): + access_count += count + bytes_served += worker['bytes_served'] + + if process['active_workers']: + active_processes += 1 + + result['workers_busy'] = workers_busy + result['workers_idle'] = workers_idle + + result['access_count'] = access_count + result['bytes_served'] = bytes_served + + result['active_processes'] = active_processes + + return result + +RequestSample = namedtuple('RequestSample', 'start_time duration') + +class Scoreboard(object): + + """Container for holding selected server metrics accumulated from + multiple samples making up a sampling period. + + """ + + system_frequency = 1 + + def __init__(self): + # Setup the starting values. We need to grab an initial + # set of server metrics as a reference point for certain + # values. + + data = server_metrics() + + # Start of the period will be the time we just generated + # the initial server metrics used as a reference. + + self.period_start = data['current_time'] + + # The current end time for the period always starts out + # as the same as the start time. + + self.period_end = self.period_start + + # Sample periods count tracks how many consecutive sample + # periods have been run which have been chained together. + + self.sample_periods = 1 + + # Sample count tracks how many samples have been collected + # against this sample period. + + self.sample_count = 0 + + # Sampler exiting flag indicates whether this is the final + # sampling period to be reported on due to the sampler + # exiting due to process shutdown or some other event. + + self.sampler_exiting = False + + # The server and thread limits are the maximum number of + # processes and workers per process that can be created. + # In practice the number of workers per process is always + # fixed at the thread limit as Apache doesn't dynamically + # adjust the number of running workers per process and + # instead always creates the maximum number and leaves it + # at that for the life of the process. + + self.server_limit = data['server_limit'] + self.thread_limit = data['thread_limit'] + + # Active processes is how many Apache child processes + # currently contain active workers. This is used between + # samples, to determine whether relative to the last + # sample, the number of processes increased or decreased. + + self.active_processes = 0 + + # Running counters of the total number of running, starting + # or stopped processes across all samples. The count of + # running processes is used to determine the average number + # of processes running for the whole sample period. The + # counts of starting and stopping are used in reflecting + # the amount of process churn. + + self.processes_running_count = 0 + self.processes_started_count = 0 + self.processes_stopped_count = 0 + + # Running counters of the total number of idle and busy + # workers across all samples. These counts are used to + # detemine the average number of workers in each state + # for the whole sample period. + + self.workers_idle_count = 0 + self.workers_busy_count = 0 + + # Running counters of the actual workers statuses across + # all samples. These counts are used to detemine the + # average number of workers in each state for the whole + # sample period. The statues are a more fine grained + # depiction of the worker state compared to the summary + # state of idle or busy. + + self.workers_status_count = dict.fromkeys(WORKER_STATUS.keys(), 0) + + # Access count is the number of completed requests that + # have been handled by Apache. We have the total and a + # delta for the current sampling period. + + self.access_count_total = data['access_count'] + self.access_count_delta = 0 + + # Bytes served is the number of bytes which have been + # transferred by Apache. We have the total and a delta + # for the current sampling period. + + self.bytes_served_total = data['bytes_served'] + self.bytes_served_delta = 0 + + # Request samples is a list of details for a subset of + # requests derived from the server metrics. It is not + # possible to collect the details of every request. We + # can only even get samples where we see a worker, at the + # time of the sample, which hasn't yet started a new + # request and so can extract the details from the last + # request that the worker handled. If a worker is + # handling multiple requests between sample periods, we + # also only get the opportunity to see the details for + # the last one handled. The number of request samples + # should be bounded by the number of workers times the + # number of samples in the sample period. + + self.request_samples = [] + + # Process system info records details of any processes + # such as memory, CPU usage and context switches. + + self.processes_system_info = {} + + @property + def duration(self): + """The duration of the sampling period. + + """ + + return self.period_end - self.period_start + + @property + def processes_running(self): + if self.sample_count == 0: + return 0 + + return math.ceil(float(self.processes_running_count) / + self.sample_count) + + @property + def workers_idle(self): + if self.sample_count == 0: + return 0 + + return math.ceil(float(self.workers_idle_count) / self.sample_count) + + @property + def workers_busy(self): + if self.sample_count == 0: + return 0 + + return math.ceil(float(self.workers_busy_count) / self.sample_count) + + @property + def workers_utilization(self): + if self.sample_count == 0: + return 0 + + return (float(self.workers_busy_count) / self.sample_count) / ( + self.server_limit * self.thread_limit) + + @property + def workers_status(self): + result = {} + + if self.sample_count == 0: + return result + + total = 0 + + for value in self.workers_status_count.values(): + value = float(value) / self.sample_count + total += value + + if total: + for key, value in self.workers_status_count.items(): + if key != SERVER_DEAD and value != 0: + label = WORKER_STATUS.get(key, 'Unknown') + value = float(value) / self.sample_count + result[label] = (value / total) * total + + return result + + @property + def request_percentiles(self): + result = {} + + # Calculate from the set of sampled requests the average + # and percentile metrics. + + requests = self.request_samples + + if requests: + requests.sort(key=lambda e: e.duration) + + total = sum([x.duration for x in requests]) + + # Chart as 'Average'. + + result['Average'] = total/len(requests) + + idx50 = int(0.50 * len(requests)) + result['Median'] = requests[idx50].duration + + idx95 = int(0.95 * len(requests)) + result['95%'] = requests[idx95].duration + + idx99 = int(0.99 * len(requests)) + result['99%'] = requests[idx99].duration + + return result + + @property + def request_samples_quality(self): + if self.access_count_delta == 0: + return 0.0 + + return float(len(self.request_samples)) / self.access_count_delta + + def update(self, rollover=False,exiting=False): + """Updates the scoreboard values for the current sampling + period by incorporating current server metrics. + + """ + + # Grab the current server metrics. + + data = server_metrics() + + # Update times for current sampling period and number of + # samples taken. + + sample_start = self.period_end + sample_end = data['current_time'] + sample_duration = max(0, sample_end - sample_start) + + self.period_end = sample_end + + # Calculate changes in access count and bytes served since + # the last sample. + + access_count_total = data['access_count'] + access_count_delta = access_count_total - self.access_count_total + + self.access_count_delta += access_count_delta + self.access_count_total = access_count_total + + bytes_served_total = data['bytes_served'] + bytes_served_delta = bytes_served_total - self.bytes_served_total + + self.bytes_served_delta += bytes_served_delta + self.bytes_served_total = bytes_served_total + + # Collect request samples. The requests must have completed + # since the last sample time and the worker must not have + # already started on a new request. + + for process in data['processes']: + for worker in process['workers']: + start_time = worker['start_time'] + stop_time = worker['stop_time'] + + if (stop_time > start_time and sample_start < stop_time + and stop_time <= sample_end): + + self.request_samples.append(RequestSample( + start_time=start_time, + duration=stop_time-start_time)) + + # Calculate changes in the number of active, starting and + # stopping processes, and the number of idle and busy workers. + + current_active_processes = data['active_processes'] + previous_active_processes = self.active_processes + + self.active_processes = current_active_processes + self.processes_running_count += current_active_processes + + if current_active_processes > previous_active_processes: + self.processes_started_count += (current_active_processes - + previous_active_processes) + + elif current_active_processes < previous_active_processes: + self.processes_stopped_count += (previous_active_processes - + current_active_processes) + + self.workers_idle_count += data['workers_idle'] + self.workers_busy_count += data['workers_busy'] + + for process in data['processes']: + for worker in process['workers']: + self.workers_status_count[worker['status']] += 1 + + # Record details about state of processes. + + if self.sample_count % self.system_frequency == 0 or rollover: + + # First we mark all process entries as being dead. We + # will then mark as alive those which truly are. + + for details in self.processes_system_info.values(): + details['dead'] = True + + for process in data['processes']: + pid = process['pid'] + + if pid == 0: + continue + + details = self.processes_system_info.get(pid) + + if details is None: + details = dict(pid=pid) + + details['duration'] = 0.0 + + details['cpu_times'] = None + details['cpu_user_time'] = 0.0 + details['cpu_system_time'] = 0.0 + + details['ctx_switches'] = None + details['ctx_switch_voluntary'] = 0 + details['ctx_switch_involuntary'] = 0 + + details['dead'] = False + + try: + p = psutil.Process(pid) + + except psutil.NoSuchProcess: + details['dead'] = True + + continue + + try: + rss, vms = p.memory_info() + + details['memory_rss'] = rss + details['memory_vms'] = vms + + except psutil.AccessDenied: + details['dead'] = True + + continue + + except Exception: + raise + + try: + cpu_times = p.cpu_times() + + if details['cpu_times'] is None: + details['cpu_times'] = cpu_times + + # Note that we don't want to baseline CPU usage + # at zero the first time we see the process, as we + # want to capture any work performed in doing any + # startup initialisation of the process. This + # would occur before the first time we see it. + # Thus populate CPU usage with the initial values. + # Is slight risk that we may in part apportion + # this to the wrong sampling period if didn't fall + # within the sample, but nothing we can do about + # that. + + details['cpu_user_time'] = cpu_times[0] + details['cpu_system_time'] = cpu_times[1] + + else: + user_time = cpu_times[0] - details['cpu_times'][0] + system_time = cpu_times[1] - details['cpu_times'][1] + + details['cpu_times'] = cpu_times + details['cpu_user_time'] += user_time + details['cpu_system_time'] += system_time + + except psutil.AccessDenied: + details['dead'] = True + + continue + + except Exception: + raise + + try: + ctx_switches = p.num_ctx_switches() + + if details['ctx_switches'] is None: + details['ctx_switches'] = ctx_switches + + else: + voluntary = (ctx_switches.voluntary - + details['ctx_switches'].voluntary) + involuntary = (ctx_switches.involuntary - + details['ctx_switches'].involuntary) + + details['ctx_switches'] = ctx_switches + details['ctx_switch_voluntary'] += voluntary + details['ctx_switch_involuntary'] += involuntary + + except psutil.AccessDenied: + details['dead'] = True + + continue + + except Exception: + raise + + details['duration'] += sample_duration + + self.processes_system_info[pid] = details + + # Update the flag indicating whether the sampler is exiting + # and this is the final sampling period data to be supplied. + + self.sampler_exiting = exiting + + self.sample_count += 1 + + def rollover(self): + """Creates a copy of the current scoreboard and resets any + attributes back to initial values where appropriate for the + start of a new sampling period. + + """ + + # Create a copy. A shallow copy is enough. + + scoreboard = copy.deepcopy(self) + + # Reset selected attributes back to initial values. + + scoreboard.period_start = scoreboard.period_end + + scoreboard.sample_count = 0; + + scoreboard.access_count_delta = 0 + scoreboard.bytes_served_delta = 0 + + scoreboard.processes_running_count = 0 + scoreboard.processes_started_count = 0 + scoreboard.processes_stopped_count = 0 + + scoreboard.workers_idle_count = 0 + scoreboard.workers_busy_count = 0 + + scoreboard.workers_status_count = dict.fromkeys( + WORKER_STATUS.keys(), 0) + + scoreboard.request_samples = [] + + # For record of processes, we want to remove just the dead ones. + + for pid, details in list(scoreboard.processes_system_info.items()): + if details['dead']: + del scoreboard.processes_system_info[pid] + else: + details['duration'] = 0.0 + details['cpu_user_time'] = 0.0 + details['cpu_system_time'] = 0.0 + details['ctx_switch_voluntary'] = 0 + details['ctx_switch_involuntary'] = 0 + + # Increment the count of successive sampling periods. + + scoreboard.sample_periods += 1 + + return scoreboard diff --git a/src/metrics/statistics.py b/src/metrics/statistics.py new file mode 100644 index 0000000..9ad7bf0 --- /dev/null +++ b/src/metrics/statistics.py @@ -0,0 +1,89 @@ +import copy + +class Stats(dict): + + def __init__(self, count=0, total=0.0, min=0.0, max=0.0, + sum_of_squares=0.0): + + # Attribute names here must not change as this is what + # New Relic uses. Easier to adopt that convention rather + # than something slightly different. + + self.count = count + self.total = total + self.min = min + self.max = max + self.sum_of_squares = sum_of_squares + + def __setattr__(self, name, value): + self[name] = value + + def __getattr__(self, name): + return self[name] + + def merge_stats(self, other): + self.total += other.total + self.min = self.count and min(self.min, other.min) or other.min + self.max = max(self.max, other.max) + self.sum_of_squares += other.sum_of_squares + self.count += other.count + + def merge_value(self, value): + self.total += value + self.min = self.count and min(self.min, value) or value + self.max = max(self.max, value) + self.sum_of_squares += value ** 2 + self.count += 1 + +class Metrics(object): + + def __init__(self): + self.metrics = {} + + def __iter__(self): + return iter(self.metrics.items()) + + def __len__(self): + return len(self.metrics) + + def assign_value(self, name, value): + if isinstance(value, Stats): + sample = copy.copy(value) + self.metrics[name] = sample + else: + sample = Stats() + self.metrics[name] = sample + sample.merge_value(value) + + return sample + + def merge_value(self, name, value): + sample = self.fetch_stats(name) + + if isinstance(value, Stats): + sample.merge_stats(value) + else: + sample.merge_value(value) + + return sample + + def fetch_stats(self, name): + sample = self.metrics.get(name) + + if sample is None: + sample = Stats() + self.metrics[name] = sample + + return sample + + def merge_metrics(self, metrics): + for name, stats in metrics: + self.merge_value(name, stats) + + def assign_metrics(self, metrics): + for name, stats in metrics: + self.assign_value(name, stats) + + def clear_metrics(self): + self.metrics.clear() + -- cgit v1.2.1