summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNejc Saje <nsaje@redhat.com>2014-09-15 08:26:59 -0400
committerNejc Saje <nejc@saje.info>2014-09-29 06:16:34 +0000
commitfa3f4d6c5dd724a1fb2b811cd798fb80d075192e (patch)
treed8b0478724db6d786aaff2f009cc80099e55541b
parentd8317189e554c8378eefc615b73726f4b89791cb (diff)
downloadceilometer-fa3f4d6c5dd724a1fb2b811cd798fb80d075192e.tar.gz
Partition static resources defined in pipeline.yaml
Resources statically defined in pipeline.yaml are currently not subject to workload partitioning, so we can't do HA. If we have multiple agents running with the same pipeline.yaml, the samples will be duplicated. This patch partitions the static resources as well. Closes-bug: #1369538 Change-Id: Iff3b33db58302fb2e89b1b3722937a031a70be5f
-rw-r--r--ceilometer/agent.py23
-rw-r--r--ceilometer/tests/agentbase.py49
-rw-r--r--ceilometer/tests/test_utils.py8
-rw-r--r--ceilometer/utils.py4
4 files changed, 77 insertions, 7 deletions
diff --git a/ceilometer/agent.py b/ceilometer/agent.py
index cc0067a7..dad18f2e 100644
--- a/ceilometer/agent.py
+++ b/ceilometer/agent.py
@@ -32,6 +32,7 @@ from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service
from ceilometer import pipeline as publish_pipeline
+from ceilometer import utils
LOG = log.getLogger(__name__)
@@ -53,7 +54,14 @@ class Resources(object):
source_discovery = (self.agent_manager.discover(self._discovery,
discovery_cache)
if self._discovery else [])
- return self._resources + source_discovery
+ static_resources = []
+ if self._resources:
+ static_resources_group = self.agent_manager.construct_group_id(
+ utils.hash_of_set(self._resources))
+ p_coord = self.agent_manager.partition_coordinator
+ static_resources = p_coord.extract_my_subset(
+ static_resources_group, self._resources)
+ return static_resources + source_discovery
@staticmethod
def key(source, pollster):
@@ -145,8 +153,15 @@ class AgentManager(os_service.Service):
)
def join_partitioning_groups(self):
- groups = set([self._construct_group_id(d.obj.group_id)
+ groups = set([self.construct_group_id(d.obj.group_id)
for d in self.discovery_manager])
+ # let each set of statically-defined resources have its own group
+ static_resource_groups = set([
+ self.construct_group_id(utils.hash_of_set(p.resources))
+ for p in self.pipeline_manager.pipelines
+ if p.resources
+ ])
+ groups.update(static_resource_groups)
for group in groups:
self.partition_coordinator.join_group(group)
@@ -168,7 +183,7 @@ class AgentManager(os_service.Service):
return polling_tasks
- def _construct_group_id(self, discovery_group_id):
+ def construct_group_id(self, discovery_group_id):
return ('%s-%s' % (self.group_prefix,
discovery_group_id)
if discovery_group_id else None)
@@ -217,7 +232,7 @@ class AgentManager(os_service.Service):
try:
discovered = discoverer.discover(self, param)
partitioned = self.partition_coordinator.extract_my_subset(
- self._construct_group_id(discoverer.group_id),
+ self.construct_group_id(discoverer.group_id),
discovered)
resources.extend(partitioned)
if discovery_cache is not None:
diff --git a/ceilometer/tests/agentbase.py b/ceilometer/tests/agentbase.py
index 39e8d754..b658bea5 100644
--- a/ceilometer/tests/agentbase.py
+++ b/ceilometer/tests/agentbase.py
@@ -38,6 +38,7 @@ from ceilometer.publisher import test as test_publisher
from ceilometer import sample
from ceilometer.tests import base
from ceilometer import transformer
+from ceilometer import utils
class TestSample(sample.Sample):
@@ -297,8 +298,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.mgr.discovery_manager = self.create_discovery_manager()
self.mgr.join_partitioning_groups()
p_coord = self.mgr.partition_coordinator
- expected = [mock.call(self.mgr._construct_group_id(g))
- for g in ['another_group', 'global']]
+ static_group_ids = [utils.hash_of_set(p['resources'])
+ for p in self.pipeline_cfg
+ if p['resources']]
+ expected = [mock.call(self.mgr.construct_group_id(g))
+ for g in ['another_group', 'global'] + static_group_ids]
self.assertEqual(len(expected), len(p_coord.join_group.call_args_list))
for c in expected:
self.assertIn(c, p_coord.join_group.call_args_list)
@@ -686,10 +690,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'testdiscoveryanother',
'testdiscoverynonexistent',
'testdiscoveryexception']
+ self.pipeline_cfg[0]['resources'] = []
self.setup_pipeline()
polling_tasks = self.mgr.setup_polling_tasks()
self.mgr.interval_task(polling_tasks.get(60))
- expected = [mock.call(self.mgr._construct_group_id(d.obj.group_id),
+ expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id),
d.obj.resources)
for d in self.mgr.discovery_manager
if hasattr(d.obj, 'resources')]
@@ -697,3 +702,41 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
len(p_coord.extract_my_subset.call_args_list))
for c in expected:
self.assertIn(c, p_coord.extract_my_subset.call_args_list)
+
+ def test_static_resources_partitioning(self):
+ p_coord = self.mgr.partition_coordinator
+ self.mgr.default_discovery = []
+ static_resources = ['static_1', 'static_2']
+ static_resources2 = ['static_3', 'static_4']
+ self.pipeline_cfg[0]['resources'] = static_resources
+ self.pipeline_cfg.append({
+ 'name': "test_pipeline2",
+ 'interval': 60,
+ 'counters': ['test', 'test2'],
+ 'resources': static_resources2,
+ 'transformers': [],
+ 'publishers': ["test"],
+ })
+ # have one pipeline without static resources defined
+ self.pipeline_cfg.append({
+ 'name': "test_pipeline3",
+ 'interval': 60,
+ 'counters': ['test', 'test2'],
+ 'resources': [],
+ 'transformers': [],
+ 'publishers': ["test"],
+ })
+ self.setup_pipeline()
+ polling_tasks = self.mgr.setup_polling_tasks()
+ self.mgr.interval_task(polling_tasks.get(60))
+ # Only two groups need to be created, one for each pipeline,
+ # even though counter test is used twice
+ expected = [mock.call(self.mgr.construct_group_id(
+ utils.hash_of_set(resources)),
+ resources)
+ for resources in [static_resources,
+ static_resources2]]
+ self.assertEqual(len(expected),
+ len(p_coord.extract_my_subset.call_args_list))
+ for c in expected:
+ self.assertIn(c, p_coord.extract_my_subset.call_args_list)
diff --git a/ceilometer/tests/test_utils.py b/ceilometer/tests/test_utils.py
index f9a64810..bda3de19 100644
--- a/ceilometer/tests/test_utils.py
+++ b/ceilometer/tests/test_utils.py
@@ -148,6 +148,14 @@ class TestUtils(base.BaseTestCase):
('nested2[1].c', 'B')],
sorted(pairs, key=lambda x: x[0]))
+ def test_hash_of_set(self):
+ x = ['a', 'b']
+ y = ['a', 'b', 'a']
+ z = ['a', 'c']
+ self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y))
+ self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z))
+ self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z))
+
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000
diff --git a/ceilometer/utils.py b/ceilometer/utils.py
index c703521b..016fb8a3 100644
--- a/ceilometer/utils.py
+++ b/ceilometer/utils.py
@@ -206,6 +206,10 @@ def uniq(dupes, attrs):
return deduped
+def hash_of_set(s):
+ return str(hash(frozenset(s)))
+
+
class HashRing(object):
def __init__(self, nodes, replicas=100):