diff options
author | Eoghan Glynn <eglynn@redhat.com> | 2014-09-24 09:56:09 +0000 |
---|---|---|
committer | Eoghan Glynn <eglynn@redhat.com> | 2014-09-26 13:50:02 +0100 |
commit | d8317189e554c8378eefc615b73726f4b89791cb (patch) | |
tree | 0facde9c3f50b8a1f86d368def593130e2bbf3ad | |
parent | 94ebf0042925c93f5bc178df18cd4d8c8491b0d3 (diff) | |
download | ceilometer-d8317189e554c8378eefc615b73726f4b89791cb.tar.gz |
Per-source separation of static resources & discovery
Previously, the amalgamation of static resources and discovery
extensions defined for all matching pipeline sources were passed
to each pollster on each polling cycle.
This led to unintended duplication of the samples emitted when
an individual pollster matched several sources.
Now, we relate the static resources and discovery extensions to
the originating sources and only pass these when a pollster is
traversed in the context of that source.
Similarly, sinks are now related to the originating source and
samples are only published over the sinks corresponding to the
current sources.
Closes-Bug: #1357869
Change-Id: Ie973625325ba3e25c76c90e4792eeaf466ada657
-rw-r--r-- | ceilometer/agent.py | 87 | ||||
-rw-r--r-- | ceilometer/tests/agentbase.py | 56 |
2 files changed, 89 insertions, 54 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py index f906e019..cc0067a7 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -31,7 +31,7 @@ from ceilometer.openstack.common import context from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service -from ceilometer import pipeline +from ceilometer import pipeline as publish_pipeline LOG = log.getLogger(__name__) @@ -43,18 +43,22 @@ class Resources(object): def __init__(self, agent_manager): self.agent_manager = agent_manager self._resources = [] - self._discovery = set([]) + self._discovery = [] - def extend(self, pipeline): - self._resources.extend(pipeline.resources) - self._discovery.update(set(pipeline.discovery)) + def setup(self, pipeline): + self._resources = pipeline.resources + self._discovery = pipeline.discovery - @property - def resources(self): - source_discovery = (self.agent_manager.discover(self._discovery) + def get(self, discovery_cache=None): + source_discovery = (self.agent_manager.discover(self._discovery, + discovery_cache) if self._discovery else []) return self._resources + source_discovery + @staticmethod + def key(source, pollster): + return '%s-%s' % (source.name, pollster.name) + class PollingTask(object): """Polling task for polling samples and inject into pipeline. @@ -64,37 +68,44 @@ class PollingTask(object): def __init__(self, agent_manager): self.manager = agent_manager - self.pollsters = set() - # we extend the amalgamation of all static resources for this - # set of pollsters with a common interval, so as to also - # include any dynamically discovered resources specific to - # the matching pipelines (if either is present, the per-agent - # default discovery is overridden) + + # elements of the Cartesian product of sources X pollsters + # with a common interval + self.pollster_matches = set() + + # per-sink publisher contexts associated with each source + self.publishers = {} + + # we relate the static resources and per-source discovery to + # each combination of pollster and matching source resource_factory = lambda: Resources(agent_manager) self.resources = collections.defaultdict(resource_factory) - self.publish_context = pipeline.PublishContext( - agent_manager.context) - def add(self, pollster, pipelines): - self.publish_context.add_pipelines(pipelines) - for pipe_line in pipelines: - self.resources[pollster.name].extend(pipe_line) - self.pollsters.update([pollster]) + def add(self, pollster, pipeline): + if pipeline.source.name not in self.publishers: + publish_context = publish_pipeline.PublishContext( + self.manager.context) + self.publishers[pipeline.source.name] = publish_context + self.publishers[pipeline.source.name].add_pipelines([pipeline]) + self.pollster_matches.update([(pipeline.source, pollster)]) + key = Resources.key(pipeline.source, pollster) + self.resources[key].setup(pipeline) def poll_and_publish(self): """Polling sample and publish into pipeline.""" agent_resources = self.manager.discover() - with self.publish_context as publisher: - cache = {} - discovery_cache = {} - for pollster in self.pollsters: - key = pollster.name - LOG.info(_("Polling pollster %s"), key) - pollster_resources = None - if pollster.obj.default_discovery: - pollster_resources = self.manager.discover( - [pollster.obj.default_discovery], discovery_cache) - source_resources = list(self.resources[key].resources) + cache = {} + discovery_cache = {} + for source, pollster in self.pollster_matches: + LOG.info(_("Polling pollster %(poll)s in the context of %(src)s"), + dict(poll=pollster.name, src=source)) + pollster_resources = None + if pollster.obj.default_discovery: + pollster_resources = self.manager.discover( + [pollster.obj.default_discovery], discovery_cache) + key = Resources.key(source, pollster) + source_resources = list(self.resources[key].get(discovery_cache)) + with self.publishers[source.name] as publisher: try: samples = list(pollster.obj.get_samples( manager=self.manager, @@ -145,15 +156,15 @@ class AgentManager(os_service.Service): def setup_polling_tasks(self): polling_tasks = {} - for pipe_line, pollster in itertools.product( + for pipeline, pollster in itertools.product( self.pipeline_manager.pipelines, self.pollster_manager.extensions): - if pipe_line.support_meter(pollster.name): - polling_task = polling_tasks.get(pipe_line.get_interval()) + if pipeline.support_meter(pollster.name): + polling_task = polling_tasks.get(pipeline.get_interval()) if not polling_task: polling_task = self.create_polling_task() - polling_tasks[pipe_line.get_interval()] = polling_task - polling_task.add(pollster, [pipe_line]) + polling_tasks[pipeline.get_interval()] = polling_task + polling_task.add(pollster, pipeline) return polling_tasks @@ -163,7 +174,7 @@ class AgentManager(os_service.Service): if discovery_group_id else None) def start(self): - self.pipeline_manager = pipeline.setup_pipeline() + self.pipeline_manager = publish_pipeline.setup_pipeline() self.partition_coordinator.start() self.join_partitioning_groups() diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index a582109e..39e8d754 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -81,7 +81,7 @@ class TestPollster(plugin.PollsterBase): resources = resources or [] self.samples.append((manager, resources)) self.resources.extend(resources) - c = copy.copy(self.test_data) + c = copy.deepcopy(self.test_data) c.resource_metadata['resources'] = resources return [c] @@ -310,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): per_task_resources = polling_tasks[60].resources self.assertEqual(1, len(per_task_resources)) self.assertEqual(set(self.pipeline_cfg[0]['resources']), - set(per_task_resources['test'].resources)) + set(per_task_resources['test_pipeline-test'].get({}))) self.mgr.interval_task(polling_tasks.values()[0]) pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] del pub.samples[0].resource_metadata['resources'] @@ -357,14 +357,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) - pollsters = polling_tasks.get(60).pollsters + pollsters = polling_tasks.get(60).pollster_matches self.assertEqual(2, len(pollsters)) per_task_resources = polling_tasks[60].resources self.assertEqual(2, len(per_task_resources)) + key = 'test_pipeline-test' self.assertEqual(set(self.pipeline_cfg[0]['resources']), - set(per_task_resources['test'].resources)) + set(per_task_resources[key].get({}))) + key = 'test_pipeline-testanother' self.assertEqual(set(self.pipeline_cfg[1]['resources']), - set(per_task_resources['testanother'].resources)) + set(per_task_resources[key].get({}))) def test_interval_exception_isolation(self): self.pipeline_cfg = [ @@ -568,35 +570,57 @@ class BaseAgentManagerTestCase(base.BaseTestCase): ['static_1', 'static_2']) def test_multiple_pipelines_different_static_resources(self): - # assert that the amalgation of all static resources for a set - # of pipelines with a common interval is passed to individual - # pollsters matching those pipelines + # assert that the individual lists of static and discovered resources + # for each pipeline with a common interval are passed to individual + # pollsters matching each pipeline self.pipeline_cfg[0]['resources'] = ['test://'] + self.pipeline_cfg[0]['discovery'] = ['testdiscovery'] self.pipeline_cfg.append({ 'name': "another_pipeline", 'interval': 60, 'counters': ['test'], 'resources': ['another://'], + 'discovery': ['testdiscoveryanother'], 'transformers': [], 'publishers': ["new"], }) self.mgr.discovery_manager = self.create_discovery_manager() - self.Discovery.resources = [] + self.Discovery.resources = ['discovered_1', 'discovered_2'] + self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4'] self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) self.assertTrue(60 in polling_tasks.keys()) self.mgr.interval_task(polling_tasks.get(60)) - self._verify_discovery_params([]) - self.assertEqual(1, len(self.Pollster.samples)) - amalgamated_resources = set(['test://', 'another://']) - self.assertEqual(amalgamated_resources, - set(self.Pollster.samples[0][1])) + self.assertEqual([None], self.Discovery.params) + self.assertEqual([None], self.DiscoveryAnother.params) + self.assertEqual(2, len(self.Pollster.samples)) + samples = self.Pollster.samples + test_resources = ['test://', 'discovered_1', 'discovered_2'] + another_resources = ['another://', 'discovered_3', 'discovered_4'] + if samples[0][1] == test_resources: + self.assertEqual(another_resources, samples[1][1]) + elif samples[0][1] == another_resources: + self.assertEqual(test_resources, samples[1][1]) + else: + self.fail('unexpected sample resources %s' % samples) + all_resources = set(test_resources) + all_resources.update(another_resources) + expected_pipelines = {'test://': 'test_pipeline', + 'another://': 'another_pipeline'} + sunk_resources = [] for pipe_line in self.mgr.pipeline_manager.pipelines: self.assertEqual(1, len(pipe_line.publishers[0].samples)) published = pipe_line.publishers[0].samples[0] - self.assertEqual(amalgamated_resources, - set(published.resource_metadata['resources'])) + published_resources = published.resource_metadata['resources'] + self.assertEqual(3, len(published_resources)) + self.assertTrue(published_resources[0] in expected_pipelines) + self.assertEqual(expected_pipelines[published_resources[0]], + pipe_line.name) + for published_resource in published_resources: + self.assertTrue(published_resource in all_resources) + sunk_resources.extend(published_resources) + self.assertEqual(all_resources, set(sunk_resources)) def test_multiple_sources_different_discoverers(self): self.Discovery.resources = ['discovered_1', 'discovered_2'] |