summaryrefslogtreecommitdiff
path: root/src/apscheduler/eventbrokers/mqtt.py
blob: dbdffe4e99937fa6395e98c31009e6f74174b9f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from __future__ import annotations

from concurrent.futures import Future
from typing import Any, Optional

import attr
from paho.mqtt.client import Client, MQTTMessage
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCodes

from ..abc import Serializer
from ..events import Event
from ..serializers.json import JSONSerializer
from ..util import reentrant
from .base import DistributedEventBrokerMixin
from .local import LocalEventBroker


@reentrant
@attr.define(eq=False)
class MQTTEventBroker(LocalEventBroker, DistributedEventBrokerMixin):
    client: Client
    serializer: Serializer = attr.field(factory=JSONSerializer)
    host: str = attr.field(kw_only=True, default='localhost')
    port: int = attr.field(kw_only=True, default=1883)
    topic: str = attr.field(kw_only=True, default='apscheduler')
    subscribe_qos: int = attr.field(kw_only=True, default=0)
    publish_qos: int = attr.field(kw_only=True, default=0)
    _ready_future: Future[None] = attr.field(init=False)

    def __enter__(self):
        super().__enter__()
        self._ready_future = Future()
        self.client.enable_logger(self._logger)
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_subscribe = self._on_subscribe
        self.client.connect(self.host, self.port)
        self.client.loop_start()
        self._ready_future.result(10)
        self._exit_stack.push(lambda exc_type, *_: self.client.loop_stop(force=bool(exc_type)))
        self._exit_stack.callback(self.client.disconnect)
        return self

    def _on_connect(self, client: Client, userdata: Any, flags: dict[str, Any],
                    rc: ReasonCodes | int, properties: Optional[Properties] = None) -> None:
        try:
            client.subscribe(self.topic, qos=self.subscribe_qos)
        except Exception as exc:
            self._ready_future.set_exception(exc)
            raise

    def _on_subscribe(self, client: Client, userdata: Any, mid, granted_qos: list[int]) -> None:
        self._ready_future.set_result(None)

    def _on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None:
        event = self.reconstitute_event(msg.payload)
        if event is not None:
            self.publish_local(event)

    def publish(self, event: Event) -> None:
        notification = self.generate_notification(event)
        self.client.publish(self.topic, notification, qos=self.publish_qos)