summaryrefslogtreecommitdiff
path: root/tests/test_aio.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_aio.py')
-rw-r--r--tests/test_aio.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/tests/test_aio.py b/tests/test_aio.py
new file mode 100644
index 0000000..fb8c6b6
--- /dev/null
+++ b/tests/test_aio.py
@@ -0,0 +1,114 @@
+import asyncio
+import libvirt
+import libvirtaio
+import sys
+import unittest
+import pytest
+
+
+class TestLibvirtAio(unittest.TestCase):
+ async def _run(self, register):
+ def lifecycleCallback(conn, dom, event, detail, domainChangedEvent):
+ if (event == libvirt.VIR_DOMAIN_EVENT_STOPPED or
+ event == libvirt.VIR_DOMAIN_EVENT_STARTED):
+ domainChangedEvent.set()
+
+ if register:
+ libvirtEvents = libvirtaio.virEventRegisterAsyncIOImpl()
+ else:
+ libvirtEvents = libvirtaio.getCurrentImpl()
+
+ conn = libvirt.open("test:///default")
+ dom = conn.lookupByName("test")
+
+ eventRegistered = False
+ domainStopped = False
+ try:
+ # Ensure the VM is running.
+ self.assertEqual([libvirt.VIR_DOMAIN_RUNNING, libvirt.VIR_DOMAIN_RUNNING_UNKNOWN], dom.state())
+ self.assertTrue(libvirtEvents.is_idle())
+
+ # Register VM start/stopped event handler.
+ domainChangedEvent = asyncio.Event()
+ conn.domainEventRegisterAny(dom, libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE, lifecycleCallback, domainChangedEvent)
+ eventRegistered = True
+
+ self.assertFalse(libvirtEvents.is_idle())
+
+ # Stop the VM.
+ dom.destroy()
+ domainStopped = True
+
+ # Ensure domain stopped event is received.
+ await asyncio.wait_for(domainChangedEvent.wait(), 2)
+ self.assertEqual([libvirt.VIR_DOMAIN_SHUTOFF, libvirt.VIR_DOMAIN_SHUTOFF_DESTROYED], dom.state())
+
+ # Start the VM.
+ domainChangedEvent.clear()
+ domainStopped = False
+ dom.create()
+
+ # Ensure domain started event is received.
+ await asyncio.wait_for(domainChangedEvent.wait(), 2)
+ self.assertEqual([libvirt.VIR_DOMAIN_RUNNING, libvirt.VIR_DOMAIN_RUNNING_BOOTED], dom.state())
+ self.assertFalse(libvirtEvents.is_idle())
+
+ # Deregister the VM start/stopped event handler.
+ eventRegistered = False
+ conn.domainEventDeregisterAny(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE)
+
+ # Wait for event queue to clear.
+ await libvirtEvents.drain()
+
+ # Make sure event queue is cleared.
+ self.assertTrue(libvirtEvents.is_idle())
+
+ finally:
+ if eventRegistered:
+ conn.domainEventDeregisterAny(libvirt.VIR_DOMAIN_EVENT_ID_LIFECYCLE)
+
+ if domainStopped:
+ dom.create()
+
+ @pytest.mark.separate_process
+ def testEventsWithManualLoopSetup(self):
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+
+ loop.run_until_complete(self._run(register=True))
+
+ loop.close()
+ asyncio.set_event_loop(None)
+
+ @pytest.mark.separate_process
+ @unittest.skipIf(sys.version_info < (3,7), "test requires Python 3.7+")
+ def testEventsWithAsyncioRun(self):
+ asyncio.run(self._run(register=True))
+
+ @pytest.mark.separate_process
+ @unittest.skipIf(sys.version_info >= (3,10), "test not compatible with Python 3.10+")
+ def testEventsPreInit(self):
+ # Initialize libvirt events before setting the event loop. This is not recommended.
+ # But is supported in older version of Python for the sake of back-compat.
+ loop = asyncio.new_event_loop()
+ libvirtaio.virEventRegisterAsyncIOImpl(loop)
+ asyncio.set_event_loop(loop)
+
+ loop.run_until_complete(self._run(register=False))
+
+ loop.close()
+ asyncio.set_event_loop(None)
+
+ @pytest.mark.separate_process
+ def testEventsImplicitLoopInit(self):
+ # Allow virEventRegisterAsyncIOImpl() to init the event loop by calling
+ # asyncio.get_event_loop(). This is not recommended and probably only works by
+ # accident. But is supported for now for the sake of back-compat. For Python
+ # 3.10+, asyncio will report deprecation warnings.
+ libvirtaio.virEventRegisterAsyncIOImpl()
+ loop = asyncio.get_event_loop()
+
+ loop.run_until_complete(self._run(register=False))
+
+ loop.close()
+ asyncio.set_event_loop(None)