summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEoghan Glynn <eglynn@redhat.com>2014-09-24 09:56:09 +0000
committerEoghan Glynn <eglynn@redhat.com>2014-09-26 13:50:02 +0100
commitd8317189e554c8378eefc615b73726f4b89791cb (patch)
tree0facde9c3f50b8a1f86d368def593130e2bbf3ad
parent94ebf0042925c93f5bc178df18cd4d8c8491b0d3 (diff)
downloadceilometer-d8317189e554c8378eefc615b73726f4b89791cb.tar.gz
Per-source separation of static resources & discovery
Previously, the amalgamation of static resources and discovery extensions defined for all matching pipeline sources were passed to each pollster on each polling cycle. This led to unintended duplication of the samples emitted when an individual pollster matched several sources. Now, we relate the static resources and discovery extensions to the originating sources and only pass these when a pollster is traversed in the context of that source. Similarly, sinks are now related to the originating source and samples are only published over the sinks corresponding to the current sources. Closes-Bug: #1357869 Change-Id: Ie973625325ba3e25c76c90e4792eeaf466ada657
-rw-r--r--ceilometer/agent.py87
-rw-r--r--ceilometer/tests/agentbase.py56
2 files changed, 89 insertions, 54 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py
index f906e019..cc0067a7 100644
--- a/ceilometer/agent.py
+++ b/ceilometer/agent.py
@@ -31,7 +31,7 @@ from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
-from ceilometer import pipeline
+from ceilometer import pipeline as publish_pipeline
LOG = log.getLogger(__name__)
@@ -43,18 +43,22 @@ class Resources(object):
def __init__(self, agent_manager):
self.agent_manager = agent_manager
self._resources = []
- self._discovery = set([])
+ self._discovery = []
- def extend(self, pipeline):
- self._resources.extend(pipeline.resources)
- self._discovery.update(set(pipeline.discovery))
+ def setup(self, pipeline):
+ self._resources = pipeline.resources
+ self._discovery = pipeline.discovery
- @property
- def resources(self):
- source_discovery = (self.agent_manager.discover(self._discovery)
+ def get(self, discovery_cache=None):
+ source_discovery = (self.agent_manager.discover(self._discovery,
+ discovery_cache)
if self._discovery else [])
return self._resources + source_discovery
+ @staticmethod
+ def key(source, pollster):
+ return '%s-%s' % (source.name, pollster.name)
+
class PollingTask(object):
"""Polling task for polling samples and inject into pipeline.
@@ -64,37 +68,44 @@ class PollingTask(object):
def __init__(self, agent_manager):
self.manager = agent_manager
- self.pollsters = set()
- # we extend the amalgamation of all static resources for this
- # set of pollsters with a common interval, so as to also
- # include any dynamically discovered resources specific to
- # the matching pipelines (if either is present, the per-agent
- # default discovery is overridden)
+
+ # elements of the Cartesian product of sources X pollsters
+ # with a common interval
+ self.pollster_matches = set()
+
+ # per-sink publisher contexts associated with each source
+ self.publishers = {}
+
+ # we relate the static resources and per-source discovery to
+ # each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
- self.publish_context = pipeline.PublishContext(
- agent_manager.context)
- def add(self, pollster, pipelines):
- self.publish_context.add_pipelines(pipelines)
- for pipe_line in pipelines:
- self.resources[pollster.name].extend(pipe_line)
- self.pollsters.update([pollster])
+ def add(self, pollster, pipeline):
+ if pipeline.source.name not in self.publishers:
+ publish_context = publish_pipeline.PublishContext(
+ self.manager.context)
+ self.publishers[pipeline.source.name] = publish_context
+ self.publishers[pipeline.source.name].add_pipelines([pipeline])
+ self.pollster_matches.update([(pipeline.source, pollster)])
+ key = Resources.key(pipeline.source, pollster)
+ self.resources[key].setup(pipeline)
def poll_and_publish(self):
"""Polling sample and publish into pipeline."""
agent_resources = self.manager.discover()
- with self.publish_context as publisher:
- cache = {}
- discovery_cache = {}
- for pollster in self.pollsters:
- key = pollster.name
- LOG.info(_("Polling pollster %s"), key)
- pollster_resources = None
- if pollster.obj.default_discovery:
- pollster_resources = self.manager.discover(
- [pollster.obj.default_discovery], discovery_cache)
- source_resources = list(self.resources[key].resources)
+ cache = {}
+ discovery_cache = {}
+ for source, pollster in self.pollster_matches:
+ LOG.info(_("Polling pollster %(poll)s in the context of %(src)s"),
+ dict(poll=pollster.name, src=source))
+ pollster_resources = None
+ if pollster.obj.default_discovery:
+ pollster_resources = self.manager.discover(
+ [pollster.obj.default_discovery], discovery_cache)
+ key = Resources.key(source, pollster)
+ source_resources = list(self.resources[key].get(discovery_cache))
+ with self.publishers[source.name] as publisher:
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
@@ -145,15 +156,15 @@ class AgentManager(os_service.Service):
def setup_polling_tasks(self):
polling_tasks = {}
- for pipe_line, pollster in itertools.product(
+ for pipeline, pollster in itertools.product(
self.pipeline_manager.pipelines,
self.pollster_manager.extensions):
- if pipe_line.support_meter(pollster.name):
- polling_task = polling_tasks.get(pipe_line.get_interval())
+ if pipeline.support_meter(pollster.name):
+ polling_task = polling_tasks.get(pipeline.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
- polling_tasks[pipe_line.get_interval()] = polling_task
- polling_task.add(pollster, [pipe_line])
+ polling_tasks[pipeline.get_interval()] = polling_task
+ polling_task.add(pollster, pipeline)
return polling_tasks
@@ -163,7 +174,7 @@ class AgentManager(os_service.Service):
if discovery_group_id else None)
def start(self):
- self.pipeline_manager = pipeline.setup_pipeline()
+ self.pipeline_manager = publish_pipeline.setup_pipeline()
self.partition_coordinator.start()
self.join_partitioning_groups()
diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py
index a582109e..39e8d754 100644
--- a/ceilometer/tests/agentbase.py
+++ b/ceilometer/tests/agentbase.py
@@ -81,7 +81,7 @@ class TestPollster(plugin.PollsterBase):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
- c = copy.copy(self.test_data)
+ c = copy.deepcopy(self.test_data)
c.resource_metadata['resources'] = resources
return [c]
@@ -310,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
per_task_resources = polling_tasks[60].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.pipeline_cfg[0]['resources']),
- set(per_task_resources['test'].resources))
+ set(per_task_resources['test_pipeline-test'].get({})))
self.mgr.interval_task(polling_tasks.values()[0])
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
del pub.samples[0].resource_metadata['resources']
@@ -357,14 +357,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
- pollsters = polling_tasks.get(60).pollsters
+ pollsters = polling_tasks.get(60).pollster_matches
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
+ key = 'test_pipeline-test'
self.assertEqual(set(self.pipeline_cfg[0]['resources']),
- set(per_task_resources['test'].resources))
+ set(per_task_resources[key].get({})))
+ key = 'test_pipeline-testanother'
self.assertEqual(set(self.pipeline_cfg[1]['resources']),
- set(per_task_resources['testanother'].resources))
+ set(per_task_resources[key].get({})))
def test_interval_exception_isolation(self):
self.pipeline_cfg = [
@@ -568,35 +570,57 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
['static_1', 'static_2'])
def test_multiple_pipelines_different_static_resources(self):
- # assert that the amalgation of all static resources for a set
- # of pipelines with a common interval is passed to individual
- # pollsters matching those pipelines
+ # assert that the individual lists of static and discovered resources
+ # for each pipeline with a common interval are passed to individual
+ # pollsters matching each pipeline
self.pipeline_cfg[0]['resources'] = ['test://']
+ self.pipeline_cfg[0]['discovery'] = ['testdiscovery']
self.pipeline_cfg.append({
'name': "another_pipeline",
'interval': 60,
'counters': ['test'],
'resources': ['another://'],
+ 'discovery': ['testdiscoveryanother'],
'transformers': [],
'publishers': ["new"],
})
self.mgr.discovery_manager = self.create_discovery_manager()
- self.Discovery.resources = []
+ self.Discovery.resources = ['discovered_1', 'discovered_2']
+ self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
- self._verify_discovery_params([])
- self.assertEqual(1, len(self.Pollster.samples))
- amalgamated_resources = set(['test://', 'another://'])
- self.assertEqual(amalgamated_resources,
- set(self.Pollster.samples[0][1]))
+ self.assertEqual([None], self.Discovery.params)
+ self.assertEqual([None], self.DiscoveryAnother.params)
+ self.assertEqual(2, len(self.Pollster.samples))
+ samples = self.Pollster.samples
+ test_resources = ['test://', 'discovered_1', 'discovered_2']
+ another_resources = ['another://', 'discovered_3', 'discovered_4']
+ if samples[0][1] == test_resources:
+ self.assertEqual(another_resources, samples[1][1])
+ elif samples[0][1] == another_resources:
+ self.assertEqual(test_resources, samples[1][1])
+ else:
+ self.fail('unexpected sample resources %s' % samples)
+ all_resources = set(test_resources)
+ all_resources.update(another_resources)
+ expected_pipelines = {'test://': 'test_pipeline',
+ 'another://': 'another_pipeline'}
+ sunk_resources = []
for pipe_line in self.mgr.pipeline_manager.pipelines:
self.assertEqual(1, len(pipe_line.publishers[0].samples))
published = pipe_line.publishers[0].samples[0]
- self.assertEqual(amalgamated_resources,
- set(published.resource_metadata['resources']))
+ published_resources = published.resource_metadata['resources']
+ self.assertEqual(3, len(published_resources))
+ self.assertTrue(published_resources[0] in expected_pipelines)
+ self.assertEqual(expected_pipelines[published_resources[0]],
+ pipe_line.name)
+ for published_resource in published_resources:
+ self.assertTrue(published_resource in all_resources)
+ sunk_resources.extend(published_resources)
+ self.assertEqual(all_resources, set(sunk_resources))
def test_multiple_sources_different_discoverers(self):
self.Discovery.resources = ['discovered_1', 'discovered_2']