summaryrefslogtreecommitdiff
path: root/tests/test_aio.py
blob: 7965fa3253c1df0976c95077057edc321929199c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import asyncio
import libvirt
import libvirtaio
import sys
import unittest
from unittest import mock

import eventmock


class TestLibvirtAio(unittest.TestCase):
    async def _run(self, register):
        def lifecycleCallback(conn, dom, event, detail, domainChangedEvent):
            if (event == libvirt.VIR_DOMAIN_EVENT_STOPPED or
                    event == libvirt.VIR_DOMAIN_EVENT_STARTED):
                domainChangedEvent.set()

        if register:
            libvirtEvents = libvirtaio.virEventRegisterAsyncIOImpl()
        else:
            libvirtEvents = libvirtaio.getCurrentImpl()

        conn = libvirt.open("test:///default")
        dom = conn.lookupByName("test")

        eventRegistered = False
        domainStopped = False
        try:
            # Ensure the VM is running.
            self.assertEqual([libvirt.VIR_DOMAIN_RUNNING, libvirt.VIR_DOMAIN_RUNNING_UNKNOWN], dom.state())
            self.assertTrue(libvirtEvents.is_idle())

            # Register VM start/stopped event handler.
            domainChangedEvent = asyncio.Event()
            conn.domainEventRegisterAny(dom, libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, lifecycleCallback, domainChangedEvent)
            eventRegistered = True

            self.assertFalse(libvirtEvents.is_idle())

            # Stop the VM.
            dom.destroy()
            domainStopped = True

            # Ensure domain stopped event is received.
            await asyncio.wait_for(domainChangedEvent.wait(), 2)
            self.assertEqual([libvirt.VIR_DOMAIN_SHUTOFF, libvirt.VIR_DOMAIN_SHUTOFF_DESTROYED], dom.state())

            # Start the VM.
            domainChangedEvent.clear()
            domainStopped = False
            dom.create()

            # Ensure domain started event is received.
            await asyncio.wait_for(domainChangedEvent.wait(), 2)
            self.assertEqual([libvirt.VIR_DOMAIN_RUNNING, libvirt.VIR_DOMAIN_RUNNING_BOOTED], dom.state())
            self.assertFalse(libvirtEvents.is_idle())

            # Deregister the VM start/stopped event handler.
            eventRegistered = False
            conn.domainEventDeregisterAny(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE)

            # Wait for event queue to clear.
            await libvirtEvents.drain()

            # Make sure event queue is cleared.
            self.assertTrue(libvirtEvents.is_idle())

        finally:
            if eventRegistered:
                conn.domainEventDeregisterAny(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE)

            if domainStopped:
                dom.create()

    @mock.patch('libvirt.virEventRegisterImpl',
                side_effect=eventmock.virEventRegisterImplMock)
    def testEventsWithManualLoopSetup(self, mock_event_register):
        # Register libvirt events after starting the asyncio loop.
        #
        # Manually create and set the event loop against this
        # thread.
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

        loop.run_until_complete(self._run(register=True))

        loop.close()
        asyncio.set_event_loop(None)
        mock_event_register.assert_called_once()

    @mock.patch('libvirt.virEventRegisterImpl',
                side_effect=eventmock.virEventRegisterImplMock)
    @unittest.skipIf(sys.version_info < (3,7), "test requires Python 3.7+")
    def testEventsWithAsyncioRun(self, mock_event_register):
        # Register libvirt events after starting the asyncio loop.
        #
        # Use asyncio helper to create and set the event loop
        # against this thread.
        asyncio.run(self._run(register=True))
        mock_event_register.assert_called_once()

    @mock.patch('libvirt.virEventRegisterImpl',
                side_effect=eventmock.virEventRegisterImplMock)
    def testEventsPreInitExplicit(self, mock_event_register):
        # Register libvirt events before starting the asyncio loop.
        #
        # Tell virEventRegisterAsyncIOImpl() explicitly what loop
        # to use before we set a loop for this thread.
        loop = asyncio.new_event_loop()
        libvirtaio.virEventRegisterAsyncIOImpl(loop)
        asyncio.set_event_loop(loop)

        loop.run_until_complete(self._run(register=False))

        loop.close()
        asyncio.set_event_loop(None)
        mock_event_register.assert_called_once()

    @mock.patch('libvirt.virEventRegisterImpl',
                side_effect=eventmock.virEventRegisterImplMock)
    @unittest.skipIf(sys.version_info >= (3,10), "test incompatible with 3.10")
    def testEventsPreInitImplicit(self, mock_event_register):
        # Register libvirt events before starting the asyncio loop.
        #
        # Allow virEventRegisterAsyncIOImpl() to implicitly find the
        # loop we set for this thread.
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        libvirtaio.virEventRegisterAsyncIOImpl()

        loop.run_until_complete(self._run(register=False))

        loop.close()
        asyncio.set_event_loop(None)
        mock_event_register.assert_called_once()

    @mock.patch('libvirt.virEventRegisterImpl',
                side_effect=eventmock.virEventRegisterImplMock)
    @unittest.skipIf(sys.version_info >= (3,10), "test incompatible with 3.10")
    def testEventsImplicitLoopInit(self, mock_event_register):
        # Register libvirt events before starting the asyncio loop.
        #
        # Let virEventRegisterAsyncIOImpl() auto-create a default
        # event loop, which we then register against this thread.
        #
        # Historically this often worked if called from the main thead,
        # but since Python 3.10 this triggers a deprecation warning,
        # which will turn into a RuntimeError in a later release.
        libvirtaio.virEventRegisterAsyncIOImpl()
        loop = asyncio.get_event_loop()

        loop.run_until_complete(self._run(register=False))

        loop.close()
        asyncio.set_event_loop(None)
        mock_event_register.assert_called_once()