diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-09-27 14:40:55 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-09-27 14:40:55 +0000 |
commit | f7e2698b173c0f8275177f77661fd949f27979cc (patch) | |
tree | 5f9be199d1625b49ced43bc632d313af2a9015b1 | |
parent | 7beccc684d74472949d54b14b72d29ce74479931 (diff) | |
parent | d8317189e554c8378eefc615b73726f4b89791cb (diff) | |
download | ceilometer-f7e2698b173c0f8275177f77661fd949f27979cc.tar.gz |
Merge "Per-source separation of static resources & discovery"
-rw-r--r-- | ceilometer/agent.py | 87 | ||||
-rw-r--r-- | ceilometer/tests/agentbase.py | 56 |
2 files changed, 89 insertions, 54 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py index f906e019..cc0067a7 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -31,7 +31,7 @@ from ceilometer.openstack.common import context from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service -from ceilometer import pipeline +from ceilometer import pipeline as publish_pipeline LOG = log.getLogger(__name__) @@ -43,18 +43,22 @@ class Resources(object): def __init__(self, agent_manager): self.agent_manager = agent_manager self._resources = [] - self._discovery = set([]) + self._discovery = [] - def extend(self, pipeline): - self._resources.extend(pipeline.resources) - self._discovery.update(set(pipeline.discovery)) + def setup(self, pipeline): + self._resources = pipeline.resources + self._discovery = pipeline.discovery - @property - def resources(self): - source_discovery = (self.agent_manager.discover(self._discovery) + def get(self, discovery_cache=None): + source_discovery = (self.agent_manager.discover(self._discovery, + discovery_cache) if self._discovery else []) return self._resources + source_discovery + @staticmethod + def key(source, pollster): + return '%s-%s' % (source.name, pollster.name) + class PollingTask(object): """Polling task for polling samples and inject into pipeline. @@ -64,37 +68,44 @@ class PollingTask(object): def __init__(self, agent_manager): self.manager = agent_manager - self.pollsters = set() - # we extend the amalgamation of all static resources for this - # set of pollsters with a common interval, so as to also - # include any dynamically discovered resources specific to - # the matching pipelines (if either is present, the per-agent - # default discovery is overridden) + + # elements of the Cartesian product of sources X pollsters + # with a common interval + self.pollster_matches = set() + + # per-sink publisher contexts associated with each source + self.publishers = {} + + # we relate the static resources and per-source discovery to + # each combination of pollster and matching source resource_factory = lambda: Resources(agent_manager) self.resources = collections.defaultdict(resource_factory) - self.publish_context = pipeline.PublishContext( - agent_manager.context) - def add(self, pollster, pipelines): - self.publish_context.add_pipelines(pipelines) - for pipe_line in pipelines: - self.resources[pollster.name].extend(pipe_line) - self.pollsters.update([pollster]) + def add(self, pollster, pipeline): + if pipeline.source.name not in self.publishers: + publish_context = publish_pipeline.PublishContext( + self.manager.context) + self.publishers[pipeline.source.name] = publish_context + self.publishers[pipeline.source.name].add_pipelines([pipeline]) + self.pollster_matches.update([(pipeline.source, pollster)]) + key = Resources.key(pipeline.source, pollster) + self.resources[key].setup(pipeline) def poll_and_publish(self): """Polling sample and publish into pipeline.""" agent_resources = self.manager.discover() - with self.publish_context as publisher: - cache = {} - discovery_cache = {} - for pollster in self.pollsters: - key = pollster.name - LOG.info(_("Polling pollster %s"), key) - pollster_resources = None - if pollster.obj.default_discovery: - pollster_resources = self.manager.discover( - [pollster.obj.default_discovery], discovery_cache) - source_resources = list(self.resources[key].resources) + cache = {} + discovery_cache = {} + for source, pollster in self.pollster_matches: + LOG.info(_("Polling pollster %(poll)s in the context of %(src)s"), + dict(poll=pollster.name, src=source)) + pollster_resources = None + if pollster.obj.default_discovery: + pollster_resources = self.manager.discover( + [pollster.obj.default_discovery], discovery_cache) + key = Resources.key(source, pollster) + source_resources = list(self.resources[key].get(discovery_cache)) + with self.publishers[source.name] as publisher: try: samples = list(pollster.obj.get_samples( manager=self.manager, @@ -145,15 +156,15 @@ class AgentManager(os_service.Service): def setup_polling_tasks(self): polling_tasks = {} - for pipe_line, pollster in itertools.product( + for pipeline, pollster in itertools.product( self.pipeline_manager.pipelines, self.pollster_manager.extensions): - if pipe_line.support_meter(pollster.name): - polling_task = polling_tasks.get(pipe_line.get_interval()) + if pipeline.support_meter(pollster.name): + polling_task = polling_tasks.get(pipeline.get_interval()) if not polling_task: polling_task = self.create_polling_task() - polling_tasks[pipe_line.get_interval()] = polling_task - polling_task.add(pollster, [pipe_line]) + polling_tasks[pipeline.get_interval()] = polling_task + polling_task.add(pollster, pipeline) return polling_tasks @@ -163,7 +174,7 @@ class AgentManager(os_service.Service): if discovery_group_id else None) def start(self): - self.pipeline_manager = pipeline.setup_pipeline() + self.pipeline_manager = publish_pipeline.setup_pipeline() self.partition_coordinator.start() self.join_partitioning_groups() diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index a582109e..39e8d754 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -81,7 +81,7 @@ class TestPollster(plugin.PollsterBase): resources = resources or [] self.samples.append((manager, resources)) self.resources.extend(resources) - c = copy.copy(self.test_data) + c = copy.deepcopy(self.test_data) c.resource_metadata['resources'] = resources return [c] @@ -310,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): per_task_resources = polling_tasks[60].resources self.assertEqual(1, len(per_task_resources)) self.assertEqual(set(self.pipeline_cfg[0]['resources']), - set(per_task_resources['test'].resources)) + set(per_task_resources['test_pipeline-test'].get({}))) self.mgr.interval_task(polling_tasks.values()[0]) pub = self.mgr.pipeline_manager.pipelines[0].publishers[0] del pub.samples[0].resource_metadata['resources'] @@ -357,14 +357,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) - pollsters = polling_tasks.get(60).pollsters + pollsters = polling_tasks.get(60).pollster_matches self.assertEqual(2, len(pollsters)) per_task_resources = polling_tasks[60].resources self.assertEqual(2, len(per_task_resources)) + key = 'test_pipeline-test' self.assertEqual(set(self.pipeline_cfg[0]['resources']), - set(per_task_resources['test'].resources)) + set(per_task_resources[key].get({}))) + key = 'test_pipeline-testanother' self.assertEqual(set(self.pipeline_cfg[1]['resources']), - set(per_task_resources['testanother'].resources)) + set(per_task_resources[key].get({}))) def test_interval_exception_isolation(self): self.pipeline_cfg = [ @@ -568,35 +570,57 @@ class BaseAgentManagerTestCase(base.BaseTestCase): ['static_1', 'static_2']) def test_multiple_pipelines_different_static_resources(self): - # assert that the amalgation of all static resources for a set - # of pipelines with a common interval is passed to individual - # pollsters matching those pipelines + # assert that the individual lists of static and discovered resources + # for each pipeline with a common interval are passed to individual + # pollsters matching each pipeline self.pipeline_cfg[0]['resources'] = ['test://'] + self.pipeline_cfg[0]['discovery'] = ['testdiscovery'] self.pipeline_cfg.append({ 'name': "another_pipeline", 'interval': 60, 'counters': ['test'], 'resources': ['another://'], + 'discovery': ['testdiscoveryanother'], 'transformers': [], 'publishers': ["new"], }) self.mgr.discovery_manager = self.create_discovery_manager() - self.Discovery.resources = [] + self.Discovery.resources = ['discovered_1', 'discovered_2'] + self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4'] self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) self.assertTrue(60 in polling_tasks.keys()) self.mgr.interval_task(polling_tasks.get(60)) - self._verify_discovery_params([]) - self.assertEqual(1, len(self.Pollster.samples)) - amalgamated_resources = set(['test://', 'another://']) - self.assertEqual(amalgamated_resources, - set(self.Pollster.samples[0][1])) + self.assertEqual([None], self.Discovery.params) + self.assertEqual([None], self.DiscoveryAnother.params) + self.assertEqual(2, len(self.Pollster.samples)) + samples = self.Pollster.samples + test_resources = ['test://', 'discovered_1', 'discovered_2'] + another_resources = ['another://', 'discovered_3', 'discovered_4'] + if samples[0][1] == test_resources: + self.assertEqual(another_resources, samples[1][1]) + elif samples[0][1] == another_resources: + self.assertEqual(test_resources, samples[1][1]) + else: + self.fail('unexpected sample resources %s' % samples) + all_resources = set(test_resources) + all_resources.update(another_resources) + expected_pipelines = {'test://': 'test_pipeline', + 'another://': 'another_pipeline'} + sunk_resources = [] for pipe_line in self.mgr.pipeline_manager.pipelines: self.assertEqual(1, len(pipe_line.publishers[0].samples)) published = pipe_line.publishers[0].samples[0] - self.assertEqual(amalgamated_resources, - set(published.resource_metadata['resources'])) + published_resources = published.resource_metadata['resources'] + self.assertEqual(3, len(published_resources)) + self.assertTrue(published_resources[0] in expected_pipelines) + self.assertEqual(expected_pipelines[published_resources[0]], + pipe_line.name) + for published_resource in published_resources: + self.assertTrue(published_resource in all_resources) + sunk_resources.extend(published_resources) + self.assertEqual(all_resources, set(sunk_resources)) def test_multiple_sources_different_discoverers(self): self.Discovery.resources = ['discovered_1', 'discovered_2'] |