summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-27 14:40:55 +0000
committerGerrit Code Review <review@openstack.org>2014-09-27 14:40:55 +0000
commitf7e2698b173c0f8275177f77661fd949f27979cc (patch)
tree5f9be199d1625b49ced43bc632d313af2a9015b1
parent7beccc684d74472949d54b14b72d29ce74479931 (diff)
parentd8317189e554c8378eefc615b73726f4b89791cb (diff)
downloadceilometer-f7e2698b173c0f8275177f77661fd949f27979cc.tar.gz
Merge "Per-source separation of static resources & discovery"
-rw-r--r--ceilometer/agent.py87
-rw-r--r--ceilometer/tests/agentbase.py56
2 files changed, 89 insertions, 54 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py
index f906e019..cc0067a7 100644
--- a/ceilometer/agent.py
+++ b/ceilometer/agent.py
@@ -31,7 +31,7 @@ from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
-from ceilometer import pipeline
+from ceilometer import pipeline as publish_pipeline
LOG = log.getLogger(__name__)
@@ -43,18 +43,22 @@ class Resources(object):
def __init__(self, agent_manager):
self.agent_manager = agent_manager
self._resources = []
- self._discovery = set([])
+ self._discovery = []
- def extend(self, pipeline):
- self._resources.extend(pipeline.resources)
- self._discovery.update(set(pipeline.discovery))
+ def setup(self, pipeline):
+ self._resources = pipeline.resources
+ self._discovery = pipeline.discovery
- @property
- def resources(self):
- source_discovery = (self.agent_manager.discover(self._discovery)
+ def get(self, discovery_cache=None):
+ source_discovery = (self.agent_manager.discover(self._discovery,
+ discovery_cache)
if self._discovery else [])
return self._resources + source_discovery
+ @staticmethod
+ def key(source, pollster):
+ return '%s-%s' % (source.name, pollster.name)
+
class PollingTask(object):
"""Polling task for polling samples and inject into pipeline.
@@ -64,37 +68,44 @@ class PollingTask(object):
def __init__(self, agent_manager):
self.manager = agent_manager
- self.pollsters = set()
- # we extend the amalgamation of all static resources for this
- # set of pollsters with a common interval, so as to also
- # include any dynamically discovered resources specific to
- # the matching pipelines (if either is present, the per-agent
- # default discovery is overridden)
+
+ # elements of the Cartesian product of sources X pollsters
+ # with a common interval
+ self.pollster_matches = set()
+
+ # per-sink publisher contexts associated with each source
+ self.publishers = {}
+
+ # we relate the static resources and per-source discovery to
+ # each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
- self.publish_context = pipeline.PublishContext(
- agent_manager.context)
- def add(self, pollster, pipelines):
- self.publish_context.add_pipelines(pipelines)
- for pipe_line in pipelines:
- self.resources[pollster.name].extend(pipe_line)
- self.pollsters.update([pollster])
+ def add(self, pollster, pipeline):
+ if pipeline.source.name not in self.publishers:
+ publish_context = publish_pipeline.PublishContext(
+ self.manager.context)
+ self.publishers[pipeline.source.name] = publish_context
+ self.publishers[pipeline.source.name].add_pipelines([pipeline])
+ self.pollster_matches.update([(pipeline.source, pollster)])
+ key = Resources.key(pipeline.source, pollster)
+ self.resources[key].setup(pipeline)
def poll_and_publish(self):
"""Polling sample and publish into pipeline."""
agent_resources = self.manager.discover()
- with self.publish_context as publisher:
- cache = {}
- discovery_cache = {}
- for pollster in self.pollsters:
- key = pollster.name
- LOG.info(_("Polling pollster %s"), key)
- pollster_resources = None
- if pollster.obj.default_discovery:
- pollster_resources = self.manager.discover(
- [pollster.obj.default_discovery], discovery_cache)
- source_resources = list(self.resources[key].resources)
+ cache = {}
+ discovery_cache = {}
+ for source, pollster in self.pollster_matches:
+ LOG.info(_("Polling pollster %(poll)s in the context of %(src)s"),
+ dict(poll=pollster.name, src=source))
+ pollster_resources = None
+ if pollster.obj.default_discovery:
+ pollster_resources = self.manager.discover(
+ [pollster.obj.default_discovery], discovery_cache)
+ key = Resources.key(source, pollster)
+ source_resources = list(self.resources[key].get(discovery_cache))
+ with self.publishers[source.name] as publisher:
try:
samples = list(pollster.obj.get_samples(
manager=self.manager,
@@ -145,15 +156,15 @@ class AgentManager(os_service.Service):
def setup_polling_tasks(self):
polling_tasks = {}
- for pipe_line, pollster in itertools.product(
+ for pipeline, pollster in itertools.product(
self.pipeline_manager.pipelines,
self.pollster_manager.extensions):
- if pipe_line.support_meter(pollster.name):
- polling_task = polling_tasks.get(pipe_line.get_interval())
+ if pipeline.support_meter(pollster.name):
+ polling_task = polling_tasks.get(pipeline.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
- polling_tasks[pipe_line.get_interval()] = polling_task
- polling_task.add(pollster, [pipe_line])
+ polling_tasks[pipeline.get_interval()] = polling_task
+ polling_task.add(pollster, pipeline)
return polling_tasks
@@ -163,7 +174,7 @@ class AgentManager(os_service.Service):
if discovery_group_id else None)
def start(self):
- self.pipeline_manager = pipeline.setup_pipeline()
+ self.pipeline_manager = publish_pipeline.setup_pipeline()
self.partition_coordinator.start()
self.join_partitioning_groups()
diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py
index a582109e..39e8d754 100644
--- a/ceilometer/tests/agentbase.py
+++ b/ceilometer/tests/agentbase.py
@@ -81,7 +81,7 @@ class TestPollster(plugin.PollsterBase):
resources = resources or []
self.samples.append((manager, resources))
self.resources.extend(resources)
- c = copy.copy(self.test_data)
+ c = copy.deepcopy(self.test_data)
c.resource_metadata['resources'] = resources
return [c]
@@ -310,7 +310,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
per_task_resources = polling_tasks[60].resources
self.assertEqual(1, len(per_task_resources))
self.assertEqual(set(self.pipeline_cfg[0]['resources']),
- set(per_task_resources['test'].resources))
+ set(per_task_resources['test_pipeline-test'].get({})))
self.mgr.interval_task(polling_tasks.values()[0])
pub = self.mgr.pipeline_manager.pipelines[0].publishers[0]
del pub.samples[0].resource_metadata['resources']
@@ -357,14 +357,16 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
- pollsters = polling_tasks.get(60).pollsters
+ pollsters = polling_tasks.get(60).pollster_matches
self.assertEqual(2, len(pollsters))
per_task_resources = polling_tasks[60].resources
self.assertEqual(2, len(per_task_resources))
+ key = 'test_pipeline-test'
self.assertEqual(set(self.pipeline_cfg[0]['resources']),
- set(per_task_resources['test'].resources))
+ set(per_task_resources[key].get({})))
+ key = 'test_pipeline-testanother'
self.assertEqual(set(self.pipeline_cfg[1]['resources']),
- set(per_task_resources['testanother'].resources))
+ set(per_task_resources[key].get({})))
def test_interval_exception_isolation(self):
self.pipeline_cfg = [
@@ -568,35 +570,57 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
['static_1', 'static_2'])
def test_multiple_pipelines_different_static_resources(self):
- # assert that the amalgation of all static resources for a set
- # of pipelines with a common interval is passed to individual
- # pollsters matching those pipelines
+ # assert that the individual lists of static and discovered resources
+ # for each pipeline with a common interval are passed to individual
+ # pollsters matching each pipeline
self.pipeline_cfg[0]['resources'] = ['test://']
+ self.pipeline_cfg[0]['discovery'] = ['testdiscovery']
self.pipeline_cfg.append({
'name': "another_pipeline",
'interval': 60,
'counters': ['test'],
'resources': ['another://'],
+ 'discovery': ['testdiscoveryanother'],
'transformers': [],
'publishers': ["new"],
})
self.mgr.discovery_manager = self.create_discovery_manager()
- self.Discovery.resources = []
+ self.Discovery.resources = ['discovered_1', 'discovered_2']
+ self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4']
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.assertEqual(1, len(polling_tasks))
self.assertTrue(60 in polling_tasks.keys())
self.mgr.interval_task(polling_tasks.get(60))
- self._verify_discovery_params([])
- self.assertEqual(1, len(self.Pollster.samples))
- amalgamated_resources = set(['test://', 'another://'])
- self.assertEqual(amalgamated_resources,
- set(self.Pollster.samples[0][1]))
+ self.assertEqual([None], self.Discovery.params)
+ self.assertEqual([None], self.DiscoveryAnother.params)
+ self.assertEqual(2, len(self.Pollster.samples))
+ samples = self.Pollster.samples
+ test_resources = ['test://', 'discovered_1', 'discovered_2']
+ another_resources = ['another://', 'discovered_3', 'discovered_4']
+ if samples[0][1] == test_resources:
+ self.assertEqual(another_resources, samples[1][1])
+ elif samples[0][1] == another_resources:
+ self.assertEqual(test_resources, samples[1][1])
+ else:
+ self.fail('unexpected sample resources %s' % samples)
+ all_resources = set(test_resources)
+ all_resources.update(another_resources)
+ expected_pipelines = {'test://': 'test_pipeline',
+ 'another://': 'another_pipeline'}
+ sunk_resources = []
for pipe_line in self.mgr.pipeline_manager.pipelines:
self.assertEqual(1, len(pipe_line.publishers[0].samples))
published = pipe_line.publishers[0].samples[0]
- self.assertEqual(amalgamated_resources,
- set(published.resource_metadata['resources']))
+ published_resources = published.resource_metadata['resources']
+ self.assertEqual(3, len(published_resources))
+ self.assertTrue(published_resources[0] in expected_pipelines)
+ self.assertEqual(expected_pipelines[published_resources[0]],
+ pipe_line.name)
+ for published_resource in published_resources:
+ self.assertTrue(published_resource in all_resources)
+ sunk_resources.extend(published_resources)
+ self.assertEqual(all_resources, set(sunk_resources))
def test_multiple_sources_different_discoverers(self):
self.Discovery.resources = ['discovered_1', 'discovered_2']