summaryrefslogtreecommitdiff
path: root/ceilometer/agent/plugin_base.py
blob: c72ac975b34445b639af81a6743cc43bed2d16e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
#
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base class for plugins.
"""

import abc
import collections

from oslo_context import context
from oslo_log import log
import oslo_messaging
import six
from stevedore import extension

from ceilometer import messaging

LOG = log.getLogger(__name__)

ExchangeTopics = collections.namedtuple('ExchangeTopics',
                                        ['exchange', 'topics'])


class PluginBase(object):
    """Base class for all plugins."""


@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
    """Base class for plugins that support the notification API."""
    def __init__(self, manager):
        super(NotificationBase, self).__init__()
        # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
        # messages to an endpoint.
        if self.event_types:
            self.filter_rule = oslo_messaging.NotificationFilter(
                event_type='|'.join(self.event_types))
        self.manager = manager

    @staticmethod
    def get_notification_topics(conf):
        if 'notification_topics' in conf:
            return conf.notification_topics
        return conf.oslo_messaging_notifications.topics

    @abc.abstractproperty
    def event_types(self):
        """Return a sequence of strings.

        Strings are defining the event types to be given to this plugin.
        """

    @abc.abstractmethod
    def get_targets(self, conf):
        """Return a sequence of oslo.messaging.Target.

        Sequence is defining the exchange and topics to be connected for this
        plugin.
        :param conf: Configuration.
        """

    @abc.abstractmethod
    def process_notification(self, message):
        """Return a sequence of Counter instances for the given message.

        :param message: Message to process.
        """

    @staticmethod
    def _consume_and_drop(self, ctxt, publisher_id, event_type, payload,
                          metadata):
        """RPC endpoint for useless notification level"""
        # NOTE(sileht): nothing special todo here, but because we listen
        # for the generic notification exchange we have to consume all its
        # queues

    audit = _consume_and_drop
    debug = _consume_and_drop
    warn = _consume_and_drop
    error = _consume_and_drop
    critical = _consume_and_drop

    def info(self, ctxt, publisher_id, event_type, payload, metadata):
        """RPC endpoint for notification messages at info level

        When another service sends a notification over the message
        bus, this method receives it.

        :param ctxt: oslo.messaging context
        :param publisher_id: publisher of the notification
        :param event_type: type of notification
        :param payload: notification payload
        :param metadata: metadata about the notification

        """
        notification = messaging.convert_to_old_notification_format(
            'info', ctxt, publisher_id, event_type, payload, metadata)
        self.to_samples_and_publish(context.get_admin_context(), notification)

    def sample(self, ctxt, publisher_id, event_type, payload, metadata):
        """RPC endpoint for notification messages at sample level

        When another service sends a notification over the message
        bus at sample priority, this method receives it.

        :param ctxt: oslo.messaging context
        :param publisher_id: publisher of the notification
        :param event_type: type of notification
        :param payload: notification payload
        :param metadata: metadata about the notification

        """
        notification = messaging.convert_to_old_notification_format(
            'sample', ctxt, publisher_id, event_type, payload, metadata)
        self.to_samples_and_publish(context.get_admin_context(), notification)

    def to_samples_and_publish(self, context, notification):
        """Return samples produced by *process_notification*.

        Samples produced for the given notification.
        :param context: Execution context from the service or RPC call
        :param notification: The notification to process.
        """
        with self.manager.publisher(context) as p:
            p(list(self.process_notification(notification)))


class NonMetricNotificationBase(object):
    """Use to mark non-measurement meters

    There are a number of historical non-measurement meters that should really
    be captured as events. This common base allows us to disable these invalid
    meters.
    """
    pass


class ExtensionLoadError(Exception):
    """Error of loading pollster plugin.

    PollsterBase provides a hook, setup_environment, called in pollster loading
    to setup required HW/SW dependency. Any exception from it would be
    propagated as ExtensionLoadError, then skip loading this pollster.
    """
    pass


class PollsterPermanentError(Exception):
    """Permanent error when polling.

    When unrecoverable error happened in polling, pollster can raise this
    exception with failed resource to prevent itself from polling any more.
    Resource is one of parameter resources from get_samples that cause polling
    error.
    """

    def __init__(self, resources):
        self.fail_res_list = resources


@six.add_metaclass(abc.ABCMeta)
class PollsterBase(PluginBase):
    """Base class for plugins that support the polling API."""

    def setup_environment(self):
        """Setup required environment for pollster.

        Each subclass could overwrite it for specific usage. Any exception
        raised in this function would prevent pollster being loaded.
        """
        pass

    def __init__(self):
        super(PollsterBase, self).__init__()
        try:
            self.setup_environment()
        except Exception as err:
            raise ExtensionLoadError(err)

    @abc.abstractproperty
    def default_discovery(self):
        """Default discovery to use for this pollster.

        There are three ways a pollster can get a list of resources to poll,
        listed here in ascending order of precedence:
        1. from the per-agent discovery,
        2. from the per-pollster discovery (defined here)
        3. from the per-pipeline configured discovery and/or per-pipeline
        configured static resources.

        If a pollster should only get resources from #1 or #3, this property
        should be set to None.
        """

    @abc.abstractmethod
    def get_samples(self, manager, cache, resources):
        """Return a sequence of Counter instances from polling the resources.

        :param manager: The service manager class invoking the plugin.
        :param cache: A dictionary to allow pollsters to pass data
                      between themselves when recomputing it would be
                      expensive (e.g., asking another service for a
                      list of objects).
        :param resources: A list of resources the pollster will get data
                          from. It's up to the specific pollster to decide
                          how to use it. It is usually supplied by a discovery,
                          see ``default_discovery`` for more information.

        """

    @classmethod
    def build_pollsters(cls):
        """Return a list of tuple (name, pollster).

        The name is the meter name which the pollster would return, the
        pollster is a pollster object instance. The pollster which implements
        this method should be registered in the namespace of
        ceilometer.builder.xxx instead of ceilometer.poll.xxx.
        """
        return []

    @classmethod
    def get_pollsters_extensions(cls):
        """Return a list of stevedore extensions.

        The returned stevedore extensions wrap the pollster object instances
        returned by build_pollsters.
        """
        extensions = []
        try:
            for name, pollster in cls.build_pollsters():
                ext = extension.Extension(name, None, cls, pollster)
                extensions.append(ext)
        except Exception as err:
            raise ExtensionLoadError(err)
        return extensions


@six.add_metaclass(abc.ABCMeta)
class DiscoveryBase(object):
    KEYSTONE_REQUIRED_FOR_SERVICE = None
    """Service type required in keystone catalog to works"""

    @abc.abstractmethod
    def discover(self, manager, param=None):
        """Discover resources to monitor.

        The most fine-grained discovery should be preferred, so the work is
        the most evenly distributed among multiple agents (if they exist).

        For example:
        if the pollster can separately poll individual resources, it should
        have its own discovery implementation to discover those resources. If
        it can only poll per-tenant, then the `TenantDiscovery` should be
        used. If even that is not possible, use `EndpointDiscovery` (see
        their respective docstrings).

        :param manager: The service manager class invoking the plugin.
        :param param: an optional parameter to guide the discovery
        """

    @property
    def group_id(self):
        """Return group id of this discovery.

        All running recoveries with the same group_id should return the same
        set of resources at a given point in time. By default, a discovery is
        put into a global group, meaning that all discoveries of its type
        running anywhere in the cloud, return the same set of resources.

        This property can be overridden to provide correct grouping of
        localized discoveries. For example, compute discovery is localized
        to a host, which is reflected in its group_id.

        A None value signifies that this discovery does not want to be part
        of workload partitioning at all.
        """
        return 'global'