diff options
author | Nejc Saje <nsaje@redhat.com> | 2014-09-15 08:26:59 -0400 |
---|---|---|
committer | Nejc Saje <nejc@saje.info> | 2014-09-29 06:16:34 +0000 |
commit | fa3f4d6c5dd724a1fb2b811cd798fb80d075192e (patch) | |
tree | d8b0478724db6d786aaff2f009cc80099e55541b | |
parent | d8317189e554c8378eefc615b73726f4b89791cb (diff) | |
download | ceilometer-fa3f4d6c5dd724a1fb2b811cd798fb80d075192e.tar.gz |
Partition static resources defined in pipeline.yaml
Resources statically defined in pipeline.yaml are currently not
subject to workload partitioning, so we can't do HA. If we have
multiple agents running with the same pipeline.yaml, the samples
will be duplicated.
This patch partitions the static resources as well.
Closes-bug: #1369538
Change-Id: Iff3b33db58302fb2e89b1b3722937a031a70be5f
-rw-r--r-- | ceilometer/agent.py | 23 | ||||
-rw-r--r-- | ceilometer/tests/agentbase.py | 49 | ||||
-rw-r--r-- | ceilometer/tests/test_utils.py | 8 | ||||
-rw-r--r-- | ceilometer/utils.py | 4 |
4 files changed, 77 insertions, 7 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py index cc0067a7..dad18f2e 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -32,6 +32,7 @@ from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer import pipeline as publish_pipeline +from ceilometer import utils LOG = log.getLogger(__name__) @@ -53,7 +54,14 @@ class Resources(object): source_discovery = (self.agent_manager.discover(self._discovery, discovery_cache) if self._discovery else []) - return self._resources + source_discovery + static_resources = [] + if self._resources: + static_resources_group = self.agent_manager.construct_group_id( + utils.hash_of_set(self._resources)) + p_coord = self.agent_manager.partition_coordinator + static_resources = p_coord.extract_my_subset( + static_resources_group, self._resources) + return static_resources + source_discovery @staticmethod def key(source, pollster): @@ -145,8 +153,15 @@ class AgentManager(os_service.Service): ) def join_partitioning_groups(self): - groups = set([self._construct_group_id(d.obj.group_id) + groups = set([self.construct_group_id(d.obj.group_id) for d in self.discovery_manager]) + # let each set of statically-defined resources have its own group + static_resource_groups = set([ + self.construct_group_id(utils.hash_of_set(p.resources)) + for p in self.pipeline_manager.pipelines + if p.resources + ]) + groups.update(static_resource_groups) for group in groups: self.partition_coordinator.join_group(group) @@ -168,7 +183,7 @@ class AgentManager(os_service.Service): return polling_tasks - def _construct_group_id(self, discovery_group_id): + def construct_group_id(self, discovery_group_id): return ('%s-%s' % (self.group_prefix, discovery_group_id) if discovery_group_id else None) @@ -217,7 +232,7 @@ class AgentManager(os_service.Service): try: discovered = discoverer.discover(self, param) partitioned = self.partition_coordinator.extract_my_subset( - self._construct_group_id(discoverer.group_id), + self.construct_group_id(discoverer.group_id), discovered) resources.extend(partitioned) if discovery_cache is not None: diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py index 39e8d754..b658bea5 100644 --- a/ceilometer/tests/agentbase.py +++ b/ceilometer/tests/agentbase.py @@ -38,6 +38,7 @@ from ceilometer.publisher import test as test_publisher from ceilometer import sample from ceilometer.tests import base from ceilometer import transformer +from ceilometer import utils class TestSample(sample.Sample): @@ -297,8 +298,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.mgr.discovery_manager = self.create_discovery_manager() self.mgr.join_partitioning_groups() p_coord = self.mgr.partition_coordinator - expected = [mock.call(self.mgr._construct_group_id(g)) - for g in ['another_group', 'global']] + static_group_ids = [utils.hash_of_set(p['resources']) + for p in self.pipeline_cfg + if p['resources']] + expected = [mock.call(self.mgr.construct_group_id(g)) + for g in ['another_group', 'global'] + static_group_ids] self.assertEqual(len(expected), len(p_coord.join_group.call_args_list)) for c in expected: self.assertIn(c, p_coord.join_group.call_args_list) @@ -686,10 +690,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] + self.pipeline_cfg[0]['resources'] = [] self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) - expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id), + expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id), d.obj.resources) for d in self.mgr.discovery_manager if hasattr(d.obj, 'resources')] @@ -697,3 +702,41 @@ class BaseAgentManagerTestCase(base.BaseTestCase): len(p_coord.extract_my_subset.call_args_list)) for c in expected: self.assertIn(c, p_coord.extract_my_subset.call_args_list) + + def test_static_resources_partitioning(self): + p_coord = self.mgr.partition_coordinator + self.mgr.default_discovery = [] + static_resources = ['static_1', 'static_2'] + static_resources2 = ['static_3', 'static_4'] + self.pipeline_cfg[0]['resources'] = static_resources + self.pipeline_cfg.append({ + 'name': "test_pipeline2", + 'interval': 60, + 'counters': ['test', 'test2'], + 'resources': static_resources2, + 'transformers': [], + 'publishers': ["test"], + }) + # have one pipeline without static resources defined + self.pipeline_cfg.append({ + 'name': "test_pipeline3", + 'interval': 60, + 'counters': ['test', 'test2'], + 'resources': [], + 'transformers': [], + 'publishers': ["test"], + }) + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.mgr.interval_task(polling_tasks.get(60)) + # Only two groups need to be created, one for each pipeline, + # even though counter test is used twice + expected = [mock.call(self.mgr.construct_group_id( + utils.hash_of_set(resources)), + resources) + for resources in [static_resources, + static_resources2]] + self.assertEqual(len(expected), + len(p_coord.extract_my_subset.call_args_list)) + for c in expected: + self.assertIn(c, p_coord.extract_my_subset.call_args_list) diff --git a/ceilometer/tests/test_utils.py b/ceilometer/tests/test_utils.py index f9a64810..bda3de19 100644 --- a/ceilometer/tests/test_utils.py +++ b/ceilometer/tests/test_utils.py @@ -148,6 +148,14 @@ class TestUtils(base.BaseTestCase): ('nested2[1].c', 'B')], sorted(pairs, key=lambda x: x[0])) + def test_hash_of_set(self): + x = ['a', 'b'] + y = ['a', 'b', 'a'] + z = ['a', 'c'] + self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y)) + self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z)) + self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z)) + def test_hash_ring(self): num_nodes = 10 num_keys = 1000 diff --git a/ceilometer/utils.py b/ceilometer/utils.py index c703521b..016fb8a3 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -206,6 +206,10 @@ def uniq(dupes, attrs): return deduped +def hash_of_set(s): + return str(hash(frozenset(s))) + + class HashRing(object): def __init__(self, nodes, replicas=100): |