summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-29 10:11:01 +0000
committerGerrit Code Review <review@openstack.org>2014-09-29 10:11:01 +0000
commit1c5ce73023660dfc416f7aba44710f3fcfe2d9b7 (patch)
tree04ad90fef86200631da3d34d4b28c3f611d245a2
parentfb34fcbdaf2baff55f41744ac560bd6c725e6407 (diff)
parentfa3f4d6c5dd724a1fb2b811cd798fb80d075192e (diff)
downloadceilometer-1c5ce73023660dfc416f7aba44710f3fcfe2d9b7.tar.gz
Merge "Partition static resources defined in pipeline.yaml"
-rw-r--r--ceilometer/agent.py23
-rw-r--r--ceilometer/tests/agentbase.py49
-rw-r--r--ceilometer/tests/test_utils.py8
-rw-r--r--ceilometer/utils.py4
4 files changed, 77 insertions, 7 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py
index cc0067a7..dad18f2e 100644
--- a/ceilometer/agent.py
+++ b/ceilometer/agent.py
@@ -32,6 +32,7 @@ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline as publish_pipeline
+from ceilometer import utils
LOG = log.getLogger(__name__)
@@ -53,7 +54,14 @@ class Resources(object):
source_discovery = (self.agent_manager.discover(self._discovery,
discovery_cache)
if self._discovery else [])
- return self._resources + source_discovery
+ static_resources = []
+ if self._resources:
+ static_resources_group = self.agent_manager.construct_group_id(
+ utils.hash_of_set(self._resources))
+ p_coord = self.agent_manager.partition_coordinator
+ static_resources = p_coord.extract_my_subset(
+ static_resources_group, self._resources)
+ return static_resources + source_discovery
@staticmethod
def key(source, pollster):
@@ -145,8 +153,15 @@ class AgentManager(os_service.Service):
)
def join_partitioning_groups(self):
- groups = set([self._construct_group_id(d.obj.group_id)
+ groups = set([self.construct_group_id(d.obj.group_id)
for d in self.discovery_manager])
+ # let each set of statically-defined resources have its own group
+ static_resource_groups = set([
+ self.construct_group_id(utils.hash_of_set(p.resources))
+ for p in self.pipeline_manager.pipelines
+ if p.resources
+ ])
+ groups.update(static_resource_groups)
for group in groups:
self.partition_coordinator.join_group(group)
@@ -168,7 +183,7 @@ class AgentManager(os_service.Service):
return polling_tasks
- def _construct_group_id(self, discovery_group_id):
+ def construct_group_id(self, discovery_group_id):
return ('%s-%s' % (self.group_prefix,
discovery_group_id)
if discovery_group_id else None)
@@ -217,7 +232,7 @@ class AgentManager(os_service.Service):
try:
discovered = discoverer.discover(self, param)
partitioned = self.partition_coordinator.extract_my_subset(
- self._construct_group_id(discoverer.group_id),
+ self.construct_group_id(discoverer.group_id),
discovered)
resources.extend(partitioned)
if discovery_cache is not None:
diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py
index 39e8d754..b658bea5 100644
--- a/ceilometer/tests/agentbase.py
+++ b/ceilometer/tests/agentbase.py
@@ -38,6 +38,7 @@ from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer.tests import base
from ceilometer import transformer
+from ceilometer import utils
class TestSample(sample.Sample):
@@ -297,8 +298,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.mgr.discovery_manager = self.create_discovery_manager()
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
- expected = [mock.call(self.mgr._construct_group_id(g))
- for g in ['another_group', 'global']]
+ static_group_ids = [utils.hash_of_set(p['resources'])
+ for p in self.pipeline_cfg
+ if p['resources']]
+ expected = [mock.call(self.mgr.construct_group_id(g))
+ for g in ['another_group', 'global'] + static_group_ids]
self.assertEqual(len(expected), len(p_coord.join_group.call_args_list))
for c in expected:
self.assertIn(c, p_coord.join_group.call_args_list)
@@ -686,10 +690,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
+ self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
- expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id),
+ expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discovery_manager
if hasattr(d.obj, 'resources')]
@@ -697,3 +702,41 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
+
+ def test_static_resources_partitioning(self):
+ p_coord = self.mgr.partition_coordinator
+ self.mgr.default_discovery = []
+ static_resources = ['static_1', 'static_2']
+ static_resources2 = ['static_3', 'static_4']
+ self.pipeline_cfg[0]['resources'] = static_resources
+ self.pipeline_cfg.append({
+ 'name': "test_pipeline2",
+ 'interval': 60,
+ 'counters': ['test', 'test2'],
+ 'resources': static_resources2,
+ 'transformers': [],
+ 'publishers': ["test"],
+ })
+ # have one pipeline without static resources defined
+ self.pipeline_cfg.append({
+ 'name': "test_pipeline3",
+ 'interval': 60,
+ 'counters': ['test', 'test2'],
+ 'resources': [],
+ 'transformers': [],
+ 'publishers': ["test"],
+ })
+ self.setup_pipeline()
+ polling_tasks = self.mgr.setup_polling_tasks()
+ self.mgr.interval_task(polling_tasks.get(60))
+ # Only two groups need to be created, one for each pipeline,
+ # even though counter test is used twice
+ expected = [mock.call(self.mgr.construct_group_id(
+ utils.hash_of_set(resources)),
+ resources)
+ for resources in [static_resources,
+ static_resources2]]
+ self.assertEqual(len(expected),
+ len(p_coord.extract_my_subset.call_args_list))
+ for c in expected:
+ self.assertIn(c, p_coord.extract_my_subset.call_args_list)
diff --git a/ceilometer/tests/test_utils.py b/ceilometer/tests/test_utils.py
index f9a64810..bda3de19 100644
--- a/ceilometer/tests/test_utils.py
+++ b/ceilometer/tests/test_utils.py
@@ -148,6 +148,14 @@ class TestUtils(base.BaseTestCase):
('nested2[1].c', 'B')],
sorted(pairs, key=lambda x: x[0]))
+ def test_hash_of_set(self):
+ x = ['a', 'b']
+ y = ['a', 'b', 'a']
+ z = ['a', 'c']
+ self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y))
+ self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z))
+ self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z))
+
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000
diff --git a/ceilometer/utils.py b/ceilometer/utils.py
index c703521b..016fb8a3 100644
--- a/ceilometer/utils.py
+++ b/ceilometer/utils.py
@@ -206,6 +206,10 @@ def uniq(dupes, attrs):
return deduped
+def hash_of_set(s):
+ return str(hash(frozenset(s)))
+
+
class HashRing(object):
def __init__(self, nodes, replicas=100):