summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGraham Dumpleton <Graham.Dumpleton@gmail.com>2014-06-17 16:19:40 +1000
committerGraham Dumpleton <Graham.Dumpleton@gmail.com>2014-06-17 16:19:40 +1000
commit868373bfb40809d681a6981872f7af68e2aaadc8 (patch)
tree0a10f55f47716fc054bda848745dffb18a146d48
parent1e0f2d65ad5a22b1ae945f7a707a2d31bca50744 (diff)
downloadmod_wsgi-metrics-868373bfb40809d681a6981872f7af68e2aaadc8.tar.gz
Refactor to start splitting generic metrics collection from New Relic platform reporting. Also added new capabilities to monitor processes.1.1.0
-rw-r--r--src/metrics/newrelic/agent.py330
-rw-r--r--src/metrics/newrelic/platform.py (renamed from src/metrics/newrelic/interface.py)22
-rw-r--r--src/metrics/newrelic/sampler.py604
-rw-r--r--src/metrics/sampler.py125
-rw-r--r--src/metrics/scoreboard.py568
-rw-r--r--src/metrics/statistics.py89
6 files changed, 1071 insertions, 667 deletions
diff --git a/src/metrics/newrelic/agent.py b/src/metrics/newrelic/agent.py
index 2b4b74c..86ac388 100644
--- a/src/metrics/newrelic/agent.py
+++ b/src/metrics/newrelic/agent.py
@@ -1,5 +1,7 @@
import os
import logging
+import math
+import psutil
try:
from ConfigParser import RawConfigParser, NoOptionError, NoSectionError
@@ -8,79 +10,93 @@ except ImportError:
import mod_wsgi
-from .interface import Interface
-from .sampler import Sampler
+from .platform import Client
+from ..sampler import Sampler
+from ..statistics import Metrics, Stats
_logger = logging.getLogger(__name__)
-class Agent(object):
+def configuration_settings(app_name=None, license_key=None,
+ config_file=None, environment=None):
- def __init__(self, app_name=None, license_key=None, config_file=None,
- environment=None):
+ if config_file is None:
+ config_file = os.environ.get('NEW_RELIC_CONFIG_FILE', None)
- self.sampler = None
+ if config_file is not None:
+ config_object = RawConfigParser()
- if mod_wsgi.version < (4, 2, 0):
- _logger.fatal('Version 4.2.0 or newer of mod_wsgi is required '
- 'for running the New Relic platform plugin. The plugin '
- 'has been disabled.')
+ if config_file:
+ config_object.read([config_file])
- return
+ if environment is None:
+ environment = os.environ.get('NEW_RELIC_ENVIRONMENT', None)
+
+ def _option(name, section='newrelic', type=None, **kwargs):
+ try:
+ getter = 'get%s' % (type or '')
+ return getattr(config_object, getter)(section, name)
+ except NoOptionError:
+ if 'default' in kwargs:
+ return kwargs['default']
+ else:
+ raise
- if config_file is None:
- config_file = os.environ.get('NEW_RELIC_CONFIG_FILE', None)
+ def option(name, type=None, **kwargs):
+ sections = []
- if config_file is not None:
- config_object = RawConfigParser()
+ if environment is not None:
+ sections.append('newrelic-platform:%s' % environment)
- if config_file:
- config_object.read([config_file])
+ sections.append('newrelic-platform')
- if environment is None:
- environment = os.environ.get('NEW_RELIC_ENVIRONMENT', None)
+ if environment is not None:
+ sections.append('newrelic:%s' % environment)
- def _option(name, section='newrelic', type=None, **kwargs):
+ sections.append('newrelic')
+
+ for section in sections:
try:
- getter = 'get%s' % (type or '')
- return getattr(config_object, getter)(section, name)
- except NoOptionError:
- if 'default' in kwargs:
- return kwargs['default']
- else:
- raise
+ return _option(name, section, type)
+ except (NoOptionError, NoSectionError):
+ pass
- def option(name, type=None, **kwargs):
- sections = []
+ if 'default' in kwargs:
+ return kwargs['default']
- if environment is not None:
- sections.append('newrelic-platform:%s' % environment)
+ if app_name is None:
+ app_name = os.environ.get('NEW_RELIC_APP_NAME', None)
+ app_name = option('app_name', default=app_name)
- sections.append('newrelic-platform')
+ if license_key is None:
+ license_key = os.environ.get('NEW_RELIC_LICENSE_KEY', None)
+ license_key = option('license_key', default=license_key)
- if environment is not None:
- sections.append('newrelic:%s' % environment)
+ if app_name is not None:
+ app_name = app_name.split(';')[0].strip()
- sections.append('newrelic')
+ return app_name, license_key
- for section in sections:
- try:
- return _option(name, section, type)
- except (NoOptionError, NoSectionError):
- pass
+class Agent(object):
- if 'default' in kwargs:
- return kwargs['default']
+ guid = 'au.com.dscpl.wsgi.mod_wsgi'
+ version = '1.1.0'
+
+ max_retries = 10
- if app_name is None:
- app_name = os.environ.get('NEW_RELIC_APP_NAME', None)
- app_name = option('app_name', default=app_name)
+ def __init__(self, sampler=None, app_name=None, license_key=None,
+ config_file=None, environment=None):
- if license_key is None:
- license_key = os.environ.get('NEW_RELIC_LICENSE_KEY', None)
- license_key = option('license_key', default=license_key)
+ self.sampler = None
- if app_name is not None:
- app_name = app_name.split(';')[0].strip()
+ if mod_wsgi.version < (4, 2, 0):
+ _logger.fatal('Version 4.2.0 or newer of mod_wsgi is required '
+ 'for running the New Relic platform plugin. The plugin '
+ 'has been disabled.')
+
+ return
+
+ app_name, license_key = configuration_settings(app_name,
+ license_key, config_file, environment)
if not license_key or not app_name:
_logger.fatal('Either the license key or application name was '
@@ -91,8 +107,218 @@ class Agent(object):
_logger.info('New Relic platform plugin reporting to %r.', app_name)
- self.interface = Interface(license_key)
- self.sampler = Sampler(self.interface, app_name)
+ self.client = Client(license_key)
+
+ self.license_key = license_key
+ self.app_name = app_name
+
+ self.sampler = sampler or Sampler()
+
+ self.sampler.register(self.process)
+
+ self.metrics = Metrics()
+ self.epoch = None
+ self.retries = 0
+
+ def upload(self, metrics, duration):
+ try:
+ self.client.send_metrics(self.app_name, self.guid, self.version,
+ duration, metrics)
+
+ except self.client.RetryDataForRequest:
+ return True
+
+ except Exception:
+ pass
+
+ return False
+
+ def record(self, name, value):
+ name = 'Component/' + name
+ self.metrics.merge_value(name, value)
+
+ def rollover(self):
+ self.metrics = Metrics()
+ self.epoch = None
+ self.retries = 0
+
+ def process(self, scoreboard):
+ # Record metric to track how many Apache server instances are
+ # reporting. The 'Server/Instances' metric should be charted as
+ # a 'Count', rounded to an 'Integer'.
+
+ self.record('Server/Instances[|servers]', 0)
+
+ # If this is the first sampling period, take that to mean that
+ # this is a new process and Apache was just (re)started. If we
+ # are being told the sampler is exiting, we take it that Apache
+ # is being shutdown. Both can show up if shutdown during the
+ # first sampling period. The 'Server/Lifecycle' metrics should
+ # be charted as a 'Count', rounded to an 'Integer'.
+
+ if scoreboard.sample_periods == 1:
+ self.record('Server/Lifecycle/Starting[|servers]', 0)
+
+ if scoreboard.sampler_exiting:
+ self.record('Server/Lifecycle/Stopping[|servers]', 0)
+
+ # Record metric to track how many processes are in use. This is
+ # calculated as an average from the total number which which
+ # were reported in use in each individual sample. The
+ # 'Process/Instances' metric should be charted as a 'Count',
+ # rounded to an 'Integer'.
+
+ self.record('Processes/Instances[|processes]', Stats(
+ count=scoreboard.processes_running))
+
+ # Also separately record how many processes were counted as
+ # having been started or stopped in the sampling period. These
+ # would be used to represent the amount of process churn which
+ # is occuring due to Apache's dynamic management of the number
+ # of processes. The 'Process/Lifecycle' metrics should be
+ # charted as a 'Count', rounded to an 'Integer'.
+
+ self.record('Processes/Lifecycle/Starting[|processes]',
+ Stats(count=scoreboard.processes_started_count))
+
+ self.record('Processes/Lifecycle/Stopping[|processes]',
+ Stats(count=scoreboard.processes_stopped_count))
+
+ # Record metric to track how many workers are in idle and busy
+ # states. This is calculated as an average from the total number
+ # which were reported in each state in each individual sample.
+ # The 'Workers/Availability' metrics should be charted as a
+ # 'Count', rounded to an 'Integer'.
+
+ self.record('Workers/Availability/Idle[|workers]', Stats(
+ count=scoreboard.workers_idle))
+ self.record('Workers/Availability/Busy[|workers]', Stats(
+ count=scoreboard.workers_busy))
+
+ # Record metric to track more fine grained status of each
+ # worker. This is calculated as an average from the total number
+ # which were reported in each state in each individual sample.
+ # The 'Workers/Status' metrics should be charted as 'Average'
+ # value, rounded to an 'Integer'.
+
+ for label, value in scoreboard.workers_status.items():
+ self.record('Workers/Status/%s[workers]' % label, value)
+
+ # Record metric to track the utilisation of the server. The
+ # 'Workers/Utilization' metric should be charted as 'Average
+ # value', with number format of 'Percentage'.
+
+ self.record('Workers/Utilization[server]',
+ scoreboard.workers_utilization)
+
+ # Record metric to track the request throughput. The
+ # 'Requests/Throughput' metric should be charted as 'Throughput'.
+
+ self.record('Requests/Throughput[|requests]', Stats(
+ count=scoreboard.access_count_delta,
+ total=scoreboard.access_count_delta))
+
+ # Record metric to track number of bytes served up. This is
+ # believed only to be from response content. There is no known
+ # separate measure for bytes uploaded. The 'Requests/Bytes Served'
+ # should be charted as 'Rate'.
+
+ self.record('Requests/Bytes Served[bytes]',
+ scoreboard.bytes_served_delta)
+
+ # Record metric to track request response time. This is
+ # calculated as an average from the request samples. That is, it
+ # is not across all requests. The 'Requests/Response Time'
+ # metric should be charted as 'Average'.
+
+ for request in scoreboard.request_samples:
+ self.record('Requests/Response Time[seconds|request]',
+ request.duration)
+
+ # Record metric to track percentile breakdown of request
+ # response time. That is, it is not across all requests. The
+ # 'Requests/Percentiles' metric should be charted as 'Average'.
+
+ for label, value in scoreboard.request_percentiles.items():
+ self.record('Requests/Percentiles/%s[seconds]' % label, value)
+
+ # Record metric to track what percentage of all requests were
+ # captured as samples. The 'Requests/Sample Quality' metric
+ # should be charted as 'Average' converted to a 'Percentage'.
+
+ self.record('Requests/Sample Quality[requests]',
+ scoreboard.request_samples_quality)
+
+ user_time = 0.0
+ system_time = 0.0
+
+ memory_rss = 0
+
+ for process in scoreboard.processes_system_info.values():
+ user_time += process['cpu_user_time']
+ system_time += process['cpu_system_time']
+ memory_rss += process['memory_rss']
+
+ # Record metric to track memory usage by processes. The
+ # 'Processes/Memory/Physical' metric should be charted as
+ # 'Average'.
+
+ self.record('Processes/Memory/Physical[bytes]',
+ process['memory_rss'])
+
+ # Record metrics to track the number of context switches.
+ # The 'Processes/Context Switches' metrics should be charted
+ # as 'Rate'.
+
+ self.record('Processes/Context Switches/Voluntary[context]',
+ process['ctx_switch_voluntary'])
+ self.record('Processes/Context Switches/Involuntary[context]',
+ process['ctx_switch_involuntary'])
+
+ # Record metric to track combined memory usage of whole server.
+ # The 'Server/Memory/Physical' metric should be charted as
+ # 'Average'.
+
+ self.record('Server/Memory/Physical[bytes]', memory_rss)
+
+ # Record metric to track the CPU usage for user and system. The
+ # 'Processes/CPU Usage' metric should be charted as 'Rate'.
+
+ self.record('Processes/CPU Usage[cpu]', user_time + system_time)
+ self.record('Processes/CPU Usage/User[cpu]', user_time)
+ self.record('Processes/CPU Usage/System[cpu]', system_time)
+
+ # Now attempt to upload the metric data to New Relic. Make sure
+ # we don't try and upload data from too short of a sampling
+ # period as it will be rejected anyway. Retain any which is too
+ # short so it is merged with subsequent sampling period.
+
+ if self.epoch is not None:
+ duration = scoreboard.period_end - self.epoch
+ else:
+ duration = scoreboard.duration
+
+ if duration > 1.0:
+ retry = self.upload(self.metrics.metrics, duration)
+ else:
+ retry = True
+
+ # If a failure occurred but the failure type was such that we
+ # could try again to upload the data, then retain the metrics
+ # for next time. If we have two many failed attempts though we
+ # give up.
+
+ if retry:
+ self.retries += 1
+
+ if self.retries == self.max_retries:
+ self.rollover()
+
+ elif self.epoch is None:
+ self.epoch = scoreboard.period_start
+
+ else:
+ self.rollover()
def start(self):
if self.sampler is not None:
diff --git a/src/metrics/newrelic/interface.py b/src/metrics/newrelic/platform.py
index a9db15f..302bac0 100644
--- a/src/metrics/newrelic/interface.py
+++ b/src/metrics/newrelic/platform.py
@@ -52,9 +52,9 @@ def json_encode(obj, **kwargs):
def json_decode(s, **kwargs):
return json.loads(s, **kwargs)
-# Platform plugin interface.
+# Platform plugin client.
-class Interface(object):
+class Client(object):
class NetworkInterfaceException(Exception): pass
class DiscardDataForRequest(NetworkInterfaceException): pass
@@ -90,7 +90,7 @@ class Interface(object):
_logger.exception('Error encoding data for JSON payload '
'with payload of %r.', payload)
- raise Interface.DiscardDataForRequest(str(exc))
+ raise Client.DiscardDataForRequest(str(exc))
if len(data) > 64*1024:
headers['Content-Encoding'] = 'deflate'
@@ -104,7 +104,7 @@ class Interface(object):
content = response.read()
except httplib.HTTPException as exc:
- raise Interface.RetryDataForRequest(str(exc))
+ raise Client.RetryDataForRequest(str(exc))
finally:
connection.close()
@@ -123,13 +123,13 @@ class Interface(object):
'payload of %r with response of %r.', headers, data,
content)
- raise Interface.DiscardDataForRequest()
+ raise Client.DiscardDataForRequest()
elif response.status == 403:
_logger.error('Data collector is indicating that the license '
'key %r is not valid.', license_key)
- raise Interface.DiscardDataForRequest()
+ raise Client.DiscardDataForRequest()
elif response.status == 413:
_logger.warning('Data collector is indicating that a request '
@@ -137,19 +137,19 @@ class Interface(object):
'the maximum allowed size limit. The length of the '
'request content was %d.', len(data))
- raise Interface.DiscardDataForRequest()
+ raise Client.DiscardDataForRequest()
elif response.status in (503, 504):
_logger.warning('Data collector is unavailable.')
- raise Interface.ServerIsUnavailable()
+ raise Client.ServerIsUnavailable()
elif response.status != 200:
_logger.warning('An unexpected HTTP response was received '
'from the data collector of %r. The payload for '
'the request was %r.', respnse.status, payload)
- raise Interface.DiscardDataForRequest()
+ raise Client.DiscardDataForRequest()
try:
if PY3:
@@ -161,14 +161,14 @@ class Interface(object):
_logger.exception('Error decoding data for JSON payload '
'with payload of %r.', content)
- raise Interface.DiscardDataForRequest(str(exc))
+ raise Client.DiscardDataForRequest(str(exc))
if 'status' in result:
return result['status']
error_message = result['error']
- raise Interface.DiscardDataForRequest(error_message)
+ raise Client.DiscardDataForRequest(error_message)
def send_metrics(self, name, guid, version, duration, metrics):
agent = {}
diff --git a/src/metrics/newrelic/sampler.py b/src/metrics/newrelic/sampler.py
deleted file mode 100644
index ab2e767..0000000
--- a/src/metrics/newrelic/sampler.py
+++ /dev/null
@@ -1,604 +0,0 @@
-import threading
-import atexit
-import os
-import sys
-import json
-import socket
-import time
-import math
-
-try:
- import Queue as queue
-except ImportError:
- import queue
-
-import mod_wsgi
-
-SERVER_READY = '_'
-SERVER_STARTING = 'S'
-SERVER_BUSY_READ = 'R'
-SERVER_BUSY_WRITE = 'W'
-SERVER_BUST_KEEPALIVE = 'K'
-SERVER_BUSY_LOG = 'L'
-SERVER_BUSY_DNS = 'D'
-SERVER_CLOSING = 'C'
-SERVER_GRACEFUL = 'G'
-SERVER_IDLE_KILL = 'I'
-SERVER_DEAD = '.'
-
-STATUS_FLAGS = {
- SERVER_READY: 'Ready',
- SERVER_STARTING: 'Starting',
- SERVER_BUSY_READ: 'Read',
- SERVER_BUSY_WRITE: 'Write',
- SERVER_BUST_KEEPALIVE: 'Keepalive',
- SERVER_BUSY_LOG: 'Logging',
- SERVER_BUSY_DNS: 'DNS lookup',
- SERVER_CLOSING: 'Closing',
- SERVER_GRACEFUL: 'Graceful',
- SERVER_IDLE_KILL: 'Dying',
- SERVER_DEAD: 'Dead'
-}
-
-class Sample(dict):
-
- def __init__(self, count=0, total=0.0, min=0.0, max=0.0,
- sum_of_squares=0.0):
- self.count = count
- self.total = total
- self.min = min
- self.max = max
- self.sum_of_squares = sum_of_squares
-
- def __setattr__(self, name, value):
- self[name] = value
-
- def __getattr__(self, name):
- return self[name]
-
- def merge_stats(self, other):
- self.total += other.total
- self.min = self.count and min(self.min, other.min) or other.min
- self.max = max(self.max, other.max)
- self.sum_of_squares += other.sum_of_squares
- self.count += other.count
-
- def merge_value(self, value):
- self.total += value
- self.min = self.count and min(self.min, value) or value
- self.max = max(self.max, value)
- self.sum_of_squares += value ** 2
- self.count += 1
-
-class Samples(object):
-
- def __init__(self):
- self.samples = {}
-
- def __iter__(self):
- return iter(self.samples.items())
-
- def sample_name(self, name):
- return 'Component/' + name
-
- def _assign_value(self, value):
- if isinstance(value, Sample):
- sample = value
- self.samples[name] = sample
- else:
- sample = Sample()
- self.samples[name] = sample
- sample.merge_value(value)
-
- return sample
-
- def assign_value(self, value):
- name = self.sample_name(name)
-
- return self._assign_value(name)
-
- def _merge_value(self, name, value):
- sample = self.samples.get(name)
-
- if sample is None:
- sample = Sample()
- self.samples[name] = sample
-
- if isinstance(value, Sample):
- sample.merge_stats(value)
- else:
- sample.merge_value(value)
-
- return sample
-
- def merge_value(self, name, value):
- name = self.sample_name(name)
-
- return self._merge_value(name, value)
-
- def fetch_sample(self, name):
- name = self.sample_name(name)
-
- sample = self.samples.get(name)
-
- if sample is None:
- sample = Sample()
- self.samples[name] = sample
-
- return sample
-
- def merge_samples(self, samples):
- for name, sample in samples:
- self._merge_value(name, sample)
-
- def assign_samples(self, samples):
- for name, sample in samples:
- self._assign_value(name, sample)
-
- def clear_samples(self):
- self.samples.clear()
-
-class Sampler(object):
-
- guid = 'au.com.dscpl.wsgi.mod_wsgi'
- version = '1.0.0'
-
- def __init__(self, interface, name):
- self.interface = interface
- self.name = name
-
- self.running = False
- self.lock = threading.Lock()
-
- self.period_start = 0
- self.access_count = 0
- self.bytes_served = 0
-
- self.request_samples = []
-
- self.metric_data = Samples()
-
- self.report_queue = queue.Queue()
-
- self.report_thread = threading.Thread(target=self.report_main_loop)
- self.report_thread.setDaemon(True)
-
- self.report_start = 0
- self.report_metrics = Samples()
-
- self.monitor_queue = queue.Queue()
-
- self.monitor_thread = threading.Thread(target=self.monitor_main_loop)
- self.monitor_thread.setDaemon(True)
-
- self.monitor_count = 0
-
- def upload_report(self, start, end, metrics):
- try:
- self.interface.send_metrics(self.name, self.guid, self.version,
- end-start, metrics.samples)
-
- except self.interface.RetryDataForRequest:
- return True
-
- except Exception:
- pass
-
- return False
-
- def generate_request_metrics(self, harvest_data):
- metrics = Samples()
-
- # Chart as 'Throughput'.
-
- metrics.merge_value('Requests/Throughput[|requests]',
- Sample(count=harvest_data['access_count'],
- total=harvest_data['access_count']))
-
- # Calculate from the set of sampled requests the average
- # and percentile metrics.
-
- requests = harvest_data['request_samples']
-
- if requests:
- for request in requests:
- # Chart as 'Average'.
-
- metrics.merge_value('Requests/Response Time[seconds|request]',
- request['duration'])
-
- requests.sort(key=lambda e: e['duration'])
-
- total = sum([x['duration'] for x in requests])
-
- # Chart as 'Average'.
-
- metrics.merge_value('Requests/Percentiles/Average[seconds]',
- total/len(requests))
-
- idx50 = int(0.50 * len(requests))
- metrics.merge_value('Requests/Percentiles/Median[seconds]',
- requests[idx50]['duration'])
-
- idx95 = int(0.95 * len(requests))
- metrics.merge_value('Requests/Percentiles/95%[seconds]',
- requests[idx95]['duration'])
-
- idx99 = int(0.99 * len(requests))
- metrics.merge_value('Requests/Percentiles/99%[seconds]',
- requests[idx99]['duration'])
-
- # Chart as 'Rate'.
-
- metrics.merge_value('Requests/Bytes Served[bytes]',
- harvest_data['bytes_served'])
-
- return metrics
-
- def generate_process_metrics(self, harvest_data):
- metrics = Samples()
-
- # Chart as 'Count'. Round to Integer.
-
- metrics.merge_value('Processes/Instances[|processes]',
- Sample(count=math.ceil(float(
- harvest_data['processes_running']) /
- harvest_data['sample_count'])))
-
- metrics.merge_value('Processes/Lifecycle/Starting[|processes]',
- Sample(count=harvest_data['processes_started']))
-
- metrics.merge_value('Processes/Lifecycle/Stopping[|processes]',
- Sample(count=harvest_data['processes_stopped']))
-
- metrics.merge_value('Workers/Availability/Idle[|workers]',
- Sample(count=math.ceil(float(
- harvest_data['idle_workers']) /
- harvest_data['sample_count'])))
- metrics.merge_value('Workers/Availability/Busy[|workers]',
- Sample(count=math.ceil(float(
- harvest_data['busy_workers']) /
- harvest_data['sample_count'])))
-
- # Chart as 'Percentage'.
-
- metrics.merge_value('Workers/Utilization[server]',
- (float(harvest_data['busy_workers']) /
- harvest_data['sample_count']) / (
- harvest_data['server_limit']*harvest_data['thread_limit']))
-
- total = 0
- for value in harvest_data['worker_status'].values():
- value = float(value)/harvest_data['sample_count']
- total += value
-
- if total:
- for key, value in harvest_data['worker_status'].items():
- if key != SERVER_DEAD and value != 0:
- label = STATUS_FLAGS.get(key, 'Unknown')
-
- # Chart as 'Average'. Round to Integer.
-
- value = float(value)/harvest_data['sample_count']
-
- metrics.merge_value('Workers/Status/%s[workers]' %
- label, (value/total)*total)
-
- return metrics
-
- def report_main_loop(self):
- # We need a set of cached metrics for the case where
- # we fail in uploading the metric data and need to
- # retain it for the next attempt to upload data.
-
- retries = 0
- retained_start = 0
- retained = Samples()
-
- # We simply wait to be passed the metric data to be
- # reported for the current sample period.
-
- while True:
- harvest_data = self.report_queue.get()
-
- # If samples is None then we are being told to
- # exit as the process is being shutdown. Otherwise
- # we should be passed the cumulative metric data
- # and the set of sampled requests.
-
- if harvest_data is None:
- return
-
- start = harvest_data['period_start']
- end = harvest_data['period_end']
-
- metrics = harvest_data['metrics']
-
- # Add metric to track how many Apache server instances
- # are reporting for each sample period.
-
- # Chart as 'Count'. Round to Integer.
-
- metrics.merge_value('Server/Instances[|servers]', 0)
-
- # Generate percentiles metrics for request samples.
-
- metrics.merge_samples(self.generate_request_metrics(harvest_data))
- metrics.merge_samples(self.generate_process_metrics(harvest_data))
-
- # If we had metrics from a previous reporting period
- # because we couldn't upload the metric data, we need
- # to merge the data from the current reporting period
- # with that for the previous period.
-
- if retained.samples:
- start = retained_start
- retained.merge_samples(metrics)
- metrics = retained
-
- # Now attempt to upload the metric data.
-
- retry = self.upload_report(start, end, metrics)
-
- # If a failure occurred but failure type was such that we
- # could try again to upload the data, then retain them. If
- # have two many failed attempts though we give up.
-
- if retry:
- retries += 1
-
- if retries == 5:
- retries = 0
-
- else:
- retained = metrics
-
- else:
- retries = 0
-
- if retries == 0:
- retained_start = 0
- retained.clear_samples()
-
- else:
- retained_start = start
- retained = metrics
-
- def generate_scoreboard(self, sample_start=None):
- busy_workers = 0
- idle_workers = 0
- access_count = 0
- bytes_served = 0
-
- active_processes = 0
-
- scoreboard = mod_wsgi.server_metrics()
-
- if sample_start is None:
- sample_start = scoreboard['current_time']
-
- scoreboard['request_samples'] = request_samples = []
-
- for process in scoreboard['processes']:
- process['active_workers'] = 0
-
- for worker in process['workers']:
- status = worker['status']
-
- if not process['quiescing'] and process['pid']:
- if (status == SERVER_READY and process['generation'] ==
- scoreboard['running_generation']):
-
- process['active_workers'] += 1
- idle_workers += 1
-
- elif status not in (SERVER_DEAD, SERVER_STARTING,
- SERVER_IDLE_KILL):
-
- process['active_workers'] += 1
- busy_workers += 1
-
- count = worker['access_count']
-
- if count or status not in (SERVER_READY, SERVER_DEAD):
- access_count += count
- bytes_served += worker['bytes_served']
-
- current_time = scoreboard['current_time']
-
- start_time = worker['start_time']
- stop_time = worker['stop_time']
-
- if (stop_time > start_time and sample_start < stop_time
- and stop_time <= current_time):
-
- duration = stop_time - start_time
- thread_num = worker['thread_num']
-
- request_samples.append(dict(start_time=start_time,
- duration=duration, thread_num=thread_num))
-
- if process['active_workers']:
- active_processes += 1
-
- scoreboard['busy_workers'] = busy_workers
- scoreboard['idle_workers'] = idle_workers
- scoreboard['access_count'] = access_count
- scoreboard['bytes_served'] = bytes_served
-
- scoreboard['active_processes'] = active_processes
-
- return scoreboard
-
- def record_process_statistics(self, scoreboard, harvest_data):
- current_active_processes = scoreboard['active_processes']
- previous_active_processes = harvest_data['active_processes']
-
- harvest_data['active_processes'] = current_active_processes
- harvest_data['processes_running'] += current_active_processes
-
- if current_active_processes > previous_active_processes:
- harvest_data['processes_started'] += (current_active_processes -
- previous_active_processes)
-
- elif current_active_processes < previous_active_processes:
- harvest_data['processes_stopped'] += (previous_active_processes -
- current_active_processes)
-
- harvest_data['idle_workers'] += scoreboard['idle_workers']
- harvest_data['busy_workers'] += scoreboard['busy_workers']
-
- for process in scoreboard['processes']:
- for worker in process['workers']:
- harvest_data['worker_status'][worker['status']] += 1
-
- def monitor_main_loop(self):
- scoreboard = self.generate_scoreboard()
-
- harvest_start = scoreboard['current_time']
- sample_start = harvest_start
- sample_duration = 0.0
-
- access_count = scoreboard['access_count']
- bytes_served = scoreboard['bytes_served']
-
- harvest_data = {}
-
- harvest_data['sample_count'] = 0
- harvest_data['period_start'] = harvest_start
-
- harvest_data['metrics'] = Samples()
-
- harvest_data['request_samples'] = []
-
- harvest_data['active_processes'] = 0
-
- harvest_data['processes_running'] = 0
- harvest_data['processes_started'] = 0
- harvest_data['processes_stopped'] = 0
-
- harvest_data['idle_workers'] = 0
- harvest_data['busy_workers'] = 0
-
- harvest_data['server_limit'] = scoreboard['server_limit']
- harvest_data['thread_limit'] = scoreboard['thread_limit']
-
- harvest_data['worker_status'] = {}
-
- for status in STATUS_FLAGS.keys():
- harvest_data['worker_status'][status] = 0
-
- harvest_data['access_count'] = 0
- harvest_data['bytes_served'] = 0
-
- # Chart as 'Count'. Round to Integer.
-
- harvest_data['metrics'].merge_value('Server/Restarts[|servers]', 0)
-
- start = time.time()
- end = start + 60.0
-
- while True:
- try:
- # We want to collect metrics on a regular second
- # interval so we need to align the timeout value.
-
- now = time.time()
- start += 1.0
- timeout = start - now
-
- return self.monitor_queue.get(timeout=timeout)
-
- except queue.Empty:
- pass
-
- harvest_data['sample_count'] += 1
-
- scoreboard = self.generate_scoreboard(sample_start)
-
- harvest_end = scoreboard['current_time']
- sample_end = harvest_end
-
- sample_duration = sample_end - sample_start
-
- self.record_process_statistics(scoreboard, harvest_data)
-
- harvest_data['request_samples'].extend(
- scoreboard['request_samples'])
-
- access_count_delta = scoreboard['access_count']
- access_count_delta -= access_count
- access_count = scoreboard['access_count']
-
- harvest_data['access_count'] += access_count_delta
-
- bytes_served_delta = scoreboard['bytes_served']
- bytes_served_delta -= bytes_served
- bytes_served = scoreboard['bytes_served']
-
- harvest_data['bytes_served'] += bytes_served_delta
-
- now = time.time()
-
- if now >= end:
- harvest_data['period_end'] = harvest_end
-
- self.report_queue.put(harvest_data)
-
- harvest_start = harvest_end
- end += 60.0
-
- _harvest_data = {}
-
- _harvest_data['sample_count'] = 0
- _harvest_data['period_start'] = harvest_start
-
- _harvest_data['metrics'] = Samples()
-
- _harvest_data['request_samples'] = []
-
- _harvest_data['active_processes'] = (
- harvest_data['active_processes'])
-
- _harvest_data['processes_running'] = 0
- _harvest_data['processes_started'] = 0
- _harvest_data['processes_stopped'] = 0
-
- _harvest_data['idle_workers'] = 0
- _harvest_data['busy_workers'] = 0
-
- _harvest_data['server_limit'] = scoreboard['server_limit']
- _harvest_data['thread_limit'] = scoreboard['thread_limit']
-
- _harvest_data['worker_status'] = {}
-
- for status in STATUS_FLAGS.keys():
- _harvest_data['worker_status'][status] = 0
-
- _harvest_data['access_count'] = 0
- _harvest_data['bytes_served'] = 0
-
- harvest_data = _harvest_data
-
- sample_start = sample_end
-
- def terminate(self):
- try:
- self.report_queue.put(None)
- self.monitor_queue.put(None)
- except Exception:
- pass
-
- self.monitor_thread.join()
- self.report_thread.join()
-
- def start(self):
- if mod_wsgi.server_metrics() is None:
- return
-
- with self.lock:
- if not self.running:
- self.running = True
- atexit.register(self.terminate)
- self.monitor_thread.start()
- self.report_thread.start()
diff --git a/src/metrics/sampler.py b/src/metrics/sampler.py
new file mode 100644
index 0000000..754b2b0
--- /dev/null
+++ b/src/metrics/sampler.py
@@ -0,0 +1,125 @@
+import threading
+import atexit
+import time
+
+try:
+ import Queue as queue
+except ImportError:
+ import queue
+
+from .scoreboard import Scoreboard
+
+import mod_wsgi
+
+class Sampler(object):
+
+ sample_interval = 1.0
+ report_interval = 60.0
+
+ def __init__(self):
+ self.running = False
+ self.lock = threading.Lock()
+
+ self.sampler_queue = queue.Queue()
+
+ self.sampler_thread = threading.Thread(target=self.sampler_loop)
+ self.sampler_thread.setDaemon(True)
+
+ self.consumer_queue = queue.Queue()
+
+ self.consumer_thread = threading.Thread(target=self.consumer_loop)
+ self.consumer_thread.setDaemon(True)
+
+ self.consumers = []
+
+ def register(self, callback):
+ self.consumers.append(callback)
+
+ def consumer_loop(self):
+ while True:
+ scoreboard = self.consumer_queue.get()
+
+ for consumer in self.consumers:
+ consumer(scoreboard)
+
+ if scoreboard.sampler_exiting:
+ return
+
+ def distribute(self, scoreboard):
+ self.consumer_queue.put(scoreboard)
+
+ def sampler_loop(self):
+ scoreboard = Scoreboard()
+
+ scheduled_time = time.time()
+ period_end_time = scheduled_time + self.report_interval
+
+ while True:
+ try:
+ # We want to collect metrics on a regular second
+ # interval so we need to align the timeout value.
+
+ now = time.time()
+ scheduled_time += self.sample_interval
+ timeout = max(0, scheduled_time - now)
+
+ self.sampler_queue.get(timeout=timeout)
+
+ # If we get here we have been notified to exit.
+ # We update the scoreboard one last time and then
+ # distribute it to any consumers.
+
+ scoreboard.update(rollover=True, exiting=True)
+
+ self.distribute(scoreboard)
+
+ return
+
+ except queue.Empty:
+ pass
+
+ # Update the scoreboard for the current sampling period.
+ # Need to check first whether after we will be rolling it
+ # over for next sampling period as well so can do any
+ # special end of sampling period actions.
+
+ now = time.time()
+
+ if now >= period_end_time:
+ scoreboard.update(rollover=True)
+
+ # Distribute scoreboard to any consumers. It
+ # is expected that they will read but not update
+ # as same instance is used for all.
+
+ self.distribute(scoreboard)
+
+ period_end_time += self.report_interval
+
+ # Rollover to a new scoreboard for the next
+ # sampling period.
+
+ scoreboard = scoreboard.rollover()
+
+ else:
+ scoreboard.update(rollover=False)
+
+ def terminate(self):
+ try:
+ self.sampler_queue.put(None)
+ except Exception:
+ pass
+
+ self.sampler_thread.join()
+ self.consumer_thread.join()
+
+ def start(self):
+ if mod_wsgi.server_metrics() is None:
+ return
+
+ with self.lock:
+ if not self.running:
+ self.running = True
+ atexit.register(self.terminate)
+ self.sampler_thread.start()
+ self.consumer_thread.start()
diff --git a/src/metrics/scoreboard.py b/src/metrics/scoreboard.py
new file mode 100644
index 0000000..df37ac9
--- /dev/null
+++ b/src/metrics/scoreboard.py
@@ -0,0 +1,568 @@
+import copy
+import math
+import psutil
+
+from collections import namedtuple
+
+from mod_wsgi import server_metrics as raw_server_metrics
+
+SERVER_READY = '_'
+SERVER_STARTING = 'S'
+SERVER_BUSY_READ = 'R'
+SERVER_BUSY_WRITE = 'W'
+SERVER_BUST_KEEPALIVE = 'K'
+SERVER_BUSY_LOG = 'L'
+SERVER_BUSY_DNS = 'D'
+SERVER_CLOSING = 'C'
+SERVER_GRACEFUL = 'G'
+SERVER_IDLE_KILL = 'I'
+SERVER_DEAD = '.'
+
+WORKER_STATUS = {
+ SERVER_READY: 'Ready',
+ SERVER_STARTING: 'Starting',
+ SERVER_BUSY_READ: 'Read',
+ SERVER_BUSY_WRITE: 'Write',
+ SERVER_BUST_KEEPALIVE: 'Keepalive',
+ SERVER_BUSY_LOG: 'Logging',
+ SERVER_BUSY_DNS: 'DNS lookup',
+ SERVER_CLOSING: 'Closing',
+ SERVER_GRACEFUL: 'Graceful',
+ SERVER_IDLE_KILL: 'Dying',
+ SERVER_DEAD: 'Dead'
+}
+
+def server_metrics():
+ """Returns server metrics, which are a combination of data from the
+ raw mod_wsgi server metrics, along with further data derived from
+ that raw data.
+
+ """
+
+ workers_busy = 0
+ workers_idle = 0
+
+ access_count = 0
+ bytes_served = 0
+
+ active_processes = 0
+
+ # Grab the raw server metrics.
+
+ result = raw_server_metrics()
+
+ # Loop over all the processes and workers they contain aggregating
+ # various details.
+
+ for process in result['processes']:
+ process['active_workers'] = 0
+
+ for worker in process['workers']:
+ # Here we determine whether a worker is busy or idle.
+
+ status = worker['status']
+
+ if not process['quiescing'] and process['pid']:
+ if (status == SERVER_READY and process['generation'] ==
+ result['running_generation']):
+
+ process['active_workers'] += 1
+ workers_idle += 1
+
+ elif status not in (SERVER_DEAD, SERVER_STARTING,
+ SERVER_IDLE_KILL):
+
+ process['active_workers'] += 1
+ workers_busy += 1
+
+ # Here we aggregate number of requests served and
+ # amount of bytes transferred.
+
+ count = worker['access_count']
+
+ if count or status not in (SERVER_READY, SERVER_DEAD):
+ access_count += count
+ bytes_served += worker['bytes_served']
+
+ if process['active_workers']:
+ active_processes += 1
+
+ result['workers_busy'] = workers_busy
+ result['workers_idle'] = workers_idle
+
+ result['access_count'] = access_count
+ result['bytes_served'] = bytes_served
+
+ result['active_processes'] = active_processes
+
+ return result
+
+RequestSample = namedtuple('RequestSample', 'start_time duration')
+
+class Scoreboard(object):
+
+ """Container for holding selected server metrics accumulated from
+ multiple samples making up a sampling period.
+
+ """
+
+ system_frequency = 1
+
+ def __init__(self):
+ # Setup the starting values. We need to grab an initial
+ # set of server metrics as a reference point for certain
+ # values.
+
+ data = server_metrics()
+
+ # Start of the period will be the time we just generated
+ # the initial server metrics used as a reference.
+
+ self.period_start = data['current_time']
+
+ # The current end time for the period always starts out
+ # as the same as the start time.
+
+ self.period_end = self.period_start
+
+ # Sample periods count tracks how many consecutive sample
+ # periods have been run which have been chained together.
+
+ self.sample_periods = 1
+
+ # Sample count tracks how many samples have been collected
+ # against this sample period.
+
+ self.sample_count = 0
+
+ # Sampler exiting flag indicates whether this is the final
+ # sampling period to be reported on due to the sampler
+ # exiting due to process shutdown or some other event.
+
+ self.sampler_exiting = False
+
+ # The server and thread limits are the maximum number of
+ # processes and workers per process that can be created.
+ # In practice the number of workers per process is always
+ # fixed at the thread limit as Apache doesn't dynamically
+ # adjust the number of running workers per process and
+ # instead always creates the maximum number and leaves it
+ # at that for the life of the process.
+
+ self.server_limit = data['server_limit']
+ self.thread_limit = data['thread_limit']
+
+ # Active processes is how many Apache child processes
+ # currently contain active workers. This is used between
+ # samples, to determine whether relative to the last
+ # sample, the number of processes increased or decreased.
+
+ self.active_processes = 0
+
+ # Running counters of the total number of running, starting
+ # or stopped processes across all samples. The count of
+ # running processes is used to determine the average number
+ # of processes running for the whole sample period. The
+ # counts of starting and stopping are used in reflecting
+ # the amount of process churn.
+
+ self.processes_running_count = 0
+ self.processes_started_count = 0
+ self.processes_stopped_count = 0
+
+ # Running counters of the total number of idle and busy
+ # workers across all samples. These counts are used to
+ # detemine the average number of workers in each state
+ # for the whole sample period.
+
+ self.workers_idle_count = 0
+ self.workers_busy_count = 0
+
+ # Running counters of the actual workers statuses across
+ # all samples. These counts are used to detemine the
+ # average number of workers in each state for the whole
+ # sample period. The statues are a more fine grained
+ # depiction of the worker state compared to the summary
+ # state of idle or busy.
+
+ self.workers_status_count = dict.fromkeys(WORKER_STATUS.keys(), 0)
+
+ # Access count is the number of completed requests that
+ # have been handled by Apache. We have the total and a
+ # delta for the current sampling period.
+
+ self.access_count_total = data['access_count']
+ self.access_count_delta = 0
+
+ # Bytes served is the number of bytes which have been
+ # transferred by Apache. We have the total and a delta
+ # for the current sampling period.
+
+ self.bytes_served_total = data['bytes_served']
+ self.bytes_served_delta = 0
+
+ # Request samples is a list of details for a subset of
+ # requests derived from the server metrics. It is not
+ # possible to collect the details of every request. We
+ # can only even get samples where we see a worker, at the
+ # time of the sample, which hasn't yet started a new
+ # request and so can extract the details from the last
+ # request that the worker handled. If a worker is
+ # handling multiple requests between sample periods, we
+ # also only get the opportunity to see the details for
+ # the last one handled. The number of request samples
+ # should be bounded by the number of workers times the
+ # number of samples in the sample period.
+
+ self.request_samples = []
+
+ # Process system info records details of any processes
+ # such as memory, CPU usage and context switches.
+
+ self.processes_system_info = {}
+
+ @property
+ def duration(self):
+ """The duration of the sampling period.
+
+ """
+
+ return self.period_end - self.period_start
+
+ @property
+ def processes_running(self):
+ if self.sample_count == 0:
+ return 0
+
+ return math.ceil(float(self.processes_running_count) /
+ self.sample_count)
+
+ @property
+ def workers_idle(self):
+ if self.sample_count == 0:
+ return 0
+
+ return math.ceil(float(self.workers_idle_count) / self.sample_count)
+
+ @property
+ def workers_busy(self):
+ if self.sample_count == 0:
+ return 0
+
+ return math.ceil(float(self.workers_busy_count) / self.sample_count)
+
+ @property
+ def workers_utilization(self):
+ if self.sample_count == 0:
+ return 0
+
+ return (float(self.workers_busy_count) / self.sample_count) / (
+ self.server_limit * self.thread_limit)
+
+ @property
+ def workers_status(self):
+ result = {}
+
+ if self.sample_count == 0:
+ return result
+
+ total = 0
+
+ for value in self.workers_status_count.values():
+ value = float(value) / self.sample_count
+ total += value
+
+ if total:
+ for key, value in self.workers_status_count.items():
+ if key != SERVER_DEAD and value != 0:
+ label = WORKER_STATUS.get(key, 'Unknown')
+ value = float(value) / self.sample_count
+ result[label] = (value / total) * total
+
+ return result
+
+ @property
+ def request_percentiles(self):
+ result = {}
+
+ # Calculate from the set of sampled requests the average
+ # and percentile metrics.
+
+ requests = self.request_samples
+
+ if requests:
+ requests.sort(key=lambda e: e.duration)
+
+ total = sum([x.duration for x in requests])
+
+ # Chart as 'Average'.
+
+ result['Average'] = total/len(requests)
+
+ idx50 = int(0.50 * len(requests))
+ result['Median'] = requests[idx50].duration
+
+ idx95 = int(0.95 * len(requests))
+ result['95%'] = requests[idx95].duration
+
+ idx99 = int(0.99 * len(requests))
+ result['99%'] = requests[idx99].duration
+
+ return result
+
+ @property
+ def request_samples_quality(self):
+ if self.access_count_delta == 0:
+ return 0.0
+
+ return float(len(self.request_samples)) / self.access_count_delta
+
+ def update(self, rollover=False,exiting=False):
+ """Updates the scoreboard values for the current sampling
+ period by incorporating current server metrics.
+
+ """
+
+ # Grab the current server metrics.
+
+ data = server_metrics()
+
+ # Update times for current sampling period and number of
+ # samples taken.
+
+ sample_start = self.period_end
+ sample_end = data['current_time']
+ sample_duration = max(0, sample_end - sample_start)
+
+ self.period_end = sample_end
+
+ # Calculate changes in access count and bytes served since
+ # the last sample.
+
+ access_count_total = data['access_count']
+ access_count_delta = access_count_total - self.access_count_total
+
+ self.access_count_delta += access_count_delta
+ self.access_count_total = access_count_total
+
+ bytes_served_total = data['bytes_served']
+ bytes_served_delta = bytes_served_total - self.bytes_served_total
+
+ self.bytes_served_delta += bytes_served_delta
+ self.bytes_served_total = bytes_served_total
+
+ # Collect request samples. The requests must have completed
+ # since the last sample time and the worker must not have
+ # already started on a new request.
+
+ for process in data['processes']:
+ for worker in process['workers']:
+ start_time = worker['start_time']
+ stop_time = worker['stop_time']
+
+ if (stop_time > start_time and sample_start < stop_time
+ and stop_time <= sample_end):
+
+ self.request_samples.append(RequestSample(
+ start_time=start_time,
+ duration=stop_time-start_time))
+
+ # Calculate changes in the number of active, starting and
+ # stopping processes, and the number of idle and busy workers.
+
+ current_active_processes = data['active_processes']
+ previous_active_processes = self.active_processes
+
+ self.active_processes = current_active_processes
+ self.processes_running_count += current_active_processes
+
+ if current_active_processes > previous_active_processes:
+ self.processes_started_count += (current_active_processes -
+ previous_active_processes)
+
+ elif current_active_processes < previous_active_processes:
+ self.processes_stopped_count += (previous_active_processes -
+ current_active_processes)
+
+ self.workers_idle_count += data['workers_idle']
+ self.workers_busy_count += data['workers_busy']
+
+ for process in data['processes']:
+ for worker in process['workers']:
+ self.workers_status_count[worker['status']] += 1
+
+ # Record details about state of processes.
+
+ if self.sample_count % self.system_frequency == 0 or rollover:
+
+ # First we mark all process entries as being dead. We
+ # will then mark as alive those which truly are.
+
+ for details in self.processes_system_info.values():
+ details['dead'] = True
+
+ for process in data['processes']:
+ pid = process['pid']
+
+ if pid == 0:
+ continue
+
+ details = self.processes_system_info.get(pid)
+
+ if details is None:
+ details = dict(pid=pid)
+
+ details['duration'] = 0.0
+
+ details['cpu_times'] = None
+ details['cpu_user_time'] = 0.0
+ details['cpu_system_time'] = 0.0
+
+ details['ctx_switches'] = None
+ details['ctx_switch_voluntary'] = 0
+ details['ctx_switch_involuntary'] = 0
+
+ details['dead'] = False
+
+ try:
+ p = psutil.Process(pid)
+
+ except psutil.NoSuchProcess:
+ details['dead'] = True
+
+ continue
+
+ try:
+ rss, vms = p.memory_info()
+
+ details['memory_rss'] = rss
+ details['memory_vms'] = vms
+
+ except psutil.AccessDenied:
+ details['dead'] = True
+
+ continue
+
+ except Exception:
+ raise
+
+ try:
+ cpu_times = p.cpu_times()
+
+ if details['cpu_times'] is None:
+ details['cpu_times'] = cpu_times
+
+ # Note that we don't want to baseline CPU usage
+ # at zero the first time we see the process, as we
+ # want to capture any work performed in doing any
+ # startup initialisation of the process. This
+ # would occur before the first time we see it.
+ # Thus populate CPU usage with the initial values.
+ # Is slight risk that we may in part apportion
+ # this to the wrong sampling period if didn't fall
+ # within the sample, but nothing we can do about
+ # that.
+
+ details['cpu_user_time'] = cpu_times[0]
+ details['cpu_system_time'] = cpu_times[1]
+
+ else:
+ user_time = cpu_times[0] - details['cpu_times'][0]
+ system_time = cpu_times[1] - details['cpu_times'][1]
+
+ details['cpu_times'] = cpu_times
+ details['cpu_user_time'] += user_time
+ details['cpu_system_time'] += system_time
+
+ except psutil.AccessDenied:
+ details['dead'] = True
+
+ continue
+
+ except Exception:
+ raise
+
+ try:
+ ctx_switches = p.num_ctx_switches()
+
+ if details['ctx_switches'] is None:
+ details['ctx_switches'] = ctx_switches
+
+ else:
+ voluntary = (ctx_switches.voluntary -
+ details['ctx_switches'].voluntary)
+ involuntary = (ctx_switches.involuntary -
+ details['ctx_switches'].involuntary)
+
+ details['ctx_switches'] = ctx_switches
+ details['ctx_switch_voluntary'] += voluntary
+ details['ctx_switch_involuntary'] += involuntary
+
+ except psutil.AccessDenied:
+ details['dead'] = True
+
+ continue
+
+ except Exception:
+ raise
+
+ details['duration'] += sample_duration
+
+ self.processes_system_info[pid] = details
+
+ # Update the flag indicating whether the sampler is exiting
+ # and this is the final sampling period data to be supplied.
+
+ self.sampler_exiting = exiting
+
+ self.sample_count += 1
+
+ def rollover(self):
+ """Creates a copy of the current scoreboard and resets any
+ attributes back to initial values where appropriate for the
+ start of a new sampling period.
+
+ """
+
+ # Create a copy. A shallow copy is enough.
+
+ scoreboard = copy.deepcopy(self)
+
+ # Reset selected attributes back to initial values.
+
+ scoreboard.period_start = scoreboard.period_end
+
+ scoreboard.sample_count = 0;
+
+ scoreboard.access_count_delta = 0
+ scoreboard.bytes_served_delta = 0
+
+ scoreboard.processes_running_count = 0
+ scoreboard.processes_started_count = 0
+ scoreboard.processes_stopped_count = 0
+
+ scoreboard.workers_idle_count = 0
+ scoreboard.workers_busy_count = 0
+
+ scoreboard.workers_status_count = dict.fromkeys(
+ WORKER_STATUS.keys(), 0)
+
+ scoreboard.request_samples = []
+
+ # For record of processes, we want to remove just the dead ones.
+
+ for pid, details in list(scoreboard.processes_system_info.items()):
+ if details['dead']:
+ del scoreboard.processes_system_info[pid]
+ else:
+ details['duration'] = 0.0
+ details['cpu_user_time'] = 0.0
+ details['cpu_system_time'] = 0.0
+ details['ctx_switch_voluntary'] = 0
+ details['ctx_switch_involuntary'] = 0
+
+ # Increment the count of successive sampling periods.
+
+ scoreboard.sample_periods += 1
+
+ return scoreboard
diff --git a/src/metrics/statistics.py b/src/metrics/statistics.py
new file mode 100644
index 0000000..9ad7bf0
--- /dev/null
+++ b/src/metrics/statistics.py
@@ -0,0 +1,89 @@
+import copy
+
+class Stats(dict):
+
+ def __init__(self, count=0, total=0.0, min=0.0, max=0.0,
+ sum_of_squares=0.0):
+
+ # Attribute names here must not change as this is what
+ # New Relic uses. Easier to adopt that convention rather
+ # than something slightly different.
+
+ self.count = count
+ self.total = total
+ self.min = min
+ self.max = max
+ self.sum_of_squares = sum_of_squares
+
+ def __setattr__(self, name, value):
+ self[name] = value
+
+ def __getattr__(self, name):
+ return self[name]
+
+ def merge_stats(self, other):
+ self.total += other.total
+ self.min = self.count and min(self.min, other.min) or other.min
+ self.max = max(self.max, other.max)
+ self.sum_of_squares += other.sum_of_squares
+ self.count += other.count
+
+ def merge_value(self, value):
+ self.total += value
+ self.min = self.count and min(self.min, value) or value
+ self.max = max(self.max, value)
+ self.sum_of_squares += value ** 2
+ self.count += 1
+
+class Metrics(object):
+
+ def __init__(self):
+ self.metrics = {}
+
+ def __iter__(self):
+ return iter(self.metrics.items())
+
+ def __len__(self):
+ return len(self.metrics)
+
+ def assign_value(self, name, value):
+ if isinstance(value, Stats):
+ sample = copy.copy(value)
+ self.metrics[name] = sample
+ else:
+ sample = Stats()
+ self.metrics[name] = sample
+ sample.merge_value(value)
+
+ return sample
+
+ def merge_value(self, name, value):
+ sample = self.fetch_stats(name)
+
+ if isinstance(value, Stats):
+ sample.merge_stats(value)
+ else:
+ sample.merge_value(value)
+
+ return sample
+
+ def fetch_stats(self, name):
+ sample = self.metrics.get(name)
+
+ if sample is None:
+ sample = Stats()
+ self.metrics[name] = sample
+
+ return sample
+
+ def merge_metrics(self, metrics):
+ for name, stats in metrics:
+ self.merge_value(name, stats)
+
+ def assign_metrics(self, metrics):
+ for name, stats in metrics:
+ self.assign_value(name, stats)
+
+ def clear_metrics(self):
+ self.metrics.clear()
+