path: root/
diff options
authorWojtek Porczyk <>2017-03-17 14:35:53 +0100
committerDaniel P. Berrange <>2017-04-04 15:28:50 +0100
commite9850106744e70b890e5fdfec9b1b5fd2c7a6ac5 (patch)
tree10ff291960cbc72a64330d2dd0b1fb25d0aa034e /
parent72e237f7b9a8c8b353753a7e9c0a6fdbb3d553c5 (diff)
Add asyncio event loop implementation
This is usable only on python >= 3.4 (or 3.3 with out-of-tree asyncio), however it should be harmless for anyone with older python versions. In simplest case, to have the callbacks queued on the default loop: >>> import libvirtaio >>> libvirtaio.virEventRegisterAsyncIOImpl() The function is not present on non-compatible platforms. Signed-off-by: Wojtek Porczyk <>
Diffstat (limited to '')
1 files changed, 399 insertions, 0 deletions
diff --git a/ b/
new file mode 100644
index 0000000..7c8c396
--- /dev/null
+++ b/
@@ -0,0 +1,399 @@
+# libvirtaio -- asyncio adapter for libvirt
+# Copyright (C) 2017 Wojtek Porczyk <>
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# Lesser General Public License for more details.
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, see
+# <>.
+'''Libvirt event loop implementation using asyncio
+Register the implementation of default loop:
+ >>> import libvirtaio
+ >>> libvirtaio.virEventRegisterAsyncIOImpl()
+.. seealso::
+__author__ = 'Wojtek Porczyk <>'
+__license__ = 'LGPL-2.1+'
+__all__ = ['virEventAsyncIOImpl', 'virEventRegisterAsyncIOImpl']
+import asyncio
+import itertools
+import logging
+import warnings
+import libvirt
+ from asyncio import ensure_future
+except ImportError:
+ from asyncio import async as ensure_future
+class Callback(object):
+ '''Base class for holding callback
+ :param virEventAsyncIOImpl impl: the implementation in which we run
+ :param cb: the callback itself
+ :param opaque: the opaque tuple passed by libvirt
+ '''
+ # pylint: disable=too-few-public-methods
+ _iden_counter = itertools.count()
+ def __init__(self, impl, cb, opaque, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.iden = next(self._iden_counter)
+ self.impl = impl
+ self.cb = cb
+ self.opaque = opaque
+ assert self.iden not in self.impl.callbacks, \
+ 'found {} callback: {!r}'.format(
+ self.iden, self.impl.callbacks[self.iden])
+ self.impl.callbacks[self.iden] = self
+ def __repr__(self):
+ return '<{} iden={}>'.format(self.__class__.__name__, self.iden)
+ def close(self):
+ '''Schedule *ff* callback'''
+ self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
+ self.impl.schedule_ff_callback(self.opaque)
+# file descriptors
+class Descriptor(object):
+ '''Manager of one file descriptor
+ :param virEventAsyncIOImpl impl: the implementation in which we run
+ :param int fd: the file descriptor
+ '''
+ def __init__(self, impl, fd):
+ self.impl = impl
+ self.fd = fd
+ self.callbacks = {}
+ def _handle(self, event):
+ '''Dispatch the event to the descriptors
+ :param int event: The event (from libvirt's constants) being dispatched
+ '''
+ for callback in self.callbacks.values():
+ if callback.event is not None and callback.event & event:
+ callback.cb(callback.iden, self.fd, event, callback.opaque)
+ def update(self):
+ '''Register or unregister callbacks at event loop
+ This should be called after change of any ``.event`` in callbacks.
+ '''
+ # It seems like loop.add_{reader,writer} can be run multiple times
+ # and will still register the callback only once. Likewise,
+ # remove_{reader,writer} may be run even if the reader/writer
+ # is not registered (and will just return False).
+ # For the edge case of empty callbacks, any() returns False.
+ if any(callback.event & ~(
+ for callback in self.callbacks.values()):
+ warnings.warn(
+ 'The only event supported are VIR_EVENT_HANDLE_READABLE '
+ UserWarning)
+ if any(callback.event & libvirt.VIR_EVENT_HANDLE_READABLE
+ for callback in self.callbacks.values()):
+ self.impl.loop.add_reader(
+ self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_READABLE)
+ else:
+ self.impl.loop.remove_reader(self.fd)
+ if any(callback.event & libvirt.VIR_EVENT_HANDLE_WRITABLE
+ for callback in self.callbacks.values()):
+ self.impl.loop.add_writer(
+ self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
+ else:
+ self.impl.loop.remove_writer(self.fd)
+ def add_handle(self, callback):
+ '''Add a callback to the descriptor
+ :param FDCallback callback: the callback to add
+ :rtype: None
+ After adding the callback, it is immediately watched.
+ '''
+ self.callbacks[callback.iden] = callback
+ self.update()
+ def remove_handle(self, iden):
+ '''Remove a callback from the descriptor
+ :param int iden: the identifier of the callback
+ :returns: the callback
+ :rtype: FDCallback
+ After removing the callback, the descriptor may be unwatched, if there
+ are no more handles for it.
+ '''
+ callback = self.callbacks.pop(iden)
+ self.update()
+ return callback
+ def close(self):
+ ''''''
+ self.callbacks.clear()
+ self.update()
+class DescriptorDict(dict):
+ '''Descriptors collection
+ This is used internally by virEventAsyncIOImpl to hold descriptors.
+ '''
+ def __init__(self, impl):
+ super().__init__()
+ self.impl = impl
+ def __missing__(self, fd):
+ descriptor = Descriptor(self.impl, fd)
+ self[fd] = descriptor
+ return descriptor
+class FDCallback(Callback):
+ '''Callback for file descriptor (watcher)
+ :param Descriptor descriptor: the descriptor manager
+ :param int event: bitset of events on which to fire the callback
+ '''
+ # pylint: disable=too-few-public-methods
+ def __init__(self, *args, descriptor, event, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.descriptor = descriptor
+ self.event = event
+ def __repr__(self):
+ return '<{} iden={} fd={} event={}>'.format(
+ self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
+ def update(self, event):
+ '''Update the callback and fix descriptor's watchers'''
+ self.event = event
+ self.descriptor.update()
+# timeouts
+class TimeoutCallback(Callback):
+ '''Callback for timer'''
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.timeout = -1
+ self._task = None
+ def __repr__(self):
+ return '<{} iden={} timeout={}>'.format(
+ self.__class__.__name__, self.iden, self.timeout)
+ @asyncio.coroutine
+ def _timer(self):
+ '''An actual timer running on the event loop.
+ This is a coroutine.
+ '''
+ while True:
+ try:
+ if self.timeout > 0:
+ timeout = self.timeout * 1e-3
+ self.impl.log.debug('sleeping %r', timeout)
+ yield from asyncio.sleep(timeout)
+ else:
+ # scheduling timeout for next loop iteration
+ yield
+ except asyncio.CancelledError:
+ self.impl.log.debug('timer %d cancelled', self.iden)
+ break
+ self.cb(self.iden, self.opaque)
+ self.impl.log.debug('timer %r callback ended', self.iden)
+ def update(self, timeout):
+ '''Start or the timer, possibly updating timeout'''
+ self.timeout = timeout
+ if self.timeout >= 0 and self._task is None:
+ self.impl.log.debug('timer %r start', self.iden)
+ self._task = ensure_future(self._timer(),
+ loop=self.impl.loop)
+ elif self.timeout < 0 and self._task is not None:
+ self.impl.log.debug('timer %r stop', self.iden)
+ self._task.cancel() # pylint: disable=no-member
+ self._task = None
+ def close(self):
+ '''Stop the timer and call ff callback'''
+ super(TimeoutCallback, self).close()
+ self.update(timeout=-1)
+# main implementation
+class virEventAsyncIOImpl(object):
+ '''Libvirt event adapter to asyncio.
+ :param loop: asyncio's event loop
+ If *loop* is not specified, the current (or default) event loop is used.
+ '''
+ def __init__(self, loop=None):
+ self.loop = loop or asyncio.get_event_loop()
+ self.callbacks = {}
+ self.descriptors = DescriptorDict(self)
+ self.log = logging.getLogger(self.__class__.__name__)
+ def register(self):
+ '''Register this instance as event loop implementation'''
+ # pylint: disable=bad-whitespace
+ self.log.debug('register()')
+ libvirt.virEventRegisterImpl(
+ self._add_handle, self._update_handle, self._remove_handle,
+ self._add_timeout, self._update_timeout, self._remove_timeout)
+ return self
+ def schedule_ff_callback(self, opaque):
+ '''Schedule a ff callback from one of the handles or timers'''
+ self.loop.call_soon(libvirt.virEventInvokeFreeCallback, opaque)
+ def is_idle(self):
+ '''Returns False if there are leftovers from a connection
+ Those may happen if there are sematical problems while closing
+ a connection. For example, not deregistered events before .close().
+ '''
+ return not self.callbacks
+ def _add_handle(self, fd, event, cb, opaque):
+ '''Register a callback for monitoring file handle events
+ :param int fd: file descriptor to listen on
+ :param int event: bitset of events on which to fire the callback
+ :param cb: the callback to be called when an event occurrs
+ :param opaque: user data to pass to the callback
+ :rtype: int
+ :returns: handle watch number to be used for updating and unregistering for events
+ .. seealso::
+ '''
+ self.log.debug('add_handle(fd=%d, event=%d, cb=%r, opaque=%r)',
+ fd, event, cb, opaque)
+ callback = FDCallback(self, cb, opaque,
+ descriptor=self.descriptors[fd], event=event)
+ self.callbacks[callback.iden] = callback
+ self.descriptors[fd].add_handle(callback)
+ return callback.iden
+ def _update_handle(self, watch, event):
+ '''Change event set for a monitored file handle
+ :param int watch: file descriptor watch to modify
+ :param int event: new events to listen on
+ .. seealso::
+ '''
+ self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
+ return self.callbacks[watch].update(event=event)
+ def _remove_handle(self, watch):
+ '''Unregister a callback from a file handle.
+ :param int watch: file descriptor watch to stop listening on
+ :returns: None (see source for explanation)
+ .. seealso::
+ '''
+ self.log.debug('remove_handle(watch=%d)', watch)
+ callback = self.callbacks.pop(watch)
+ fd = callback.descriptor.fd
+ assert callback is self.descriptors[fd].remove_handle(watch)
+ if len(self.descriptors[fd].callbacks) == 0:
+ del self.descriptors[fd]
+ callback.close()
+ def _add_timeout(self, timeout, cb, opaque):
+ '''Register a callback for a timer event
+ :param int timeout: the timeout to monitor
+ :param cb: the callback to call when timeout has expired
+ :param opaque: user data to pass to the callback
+ :rtype: int
+ :returns: a timer value
+ .. seealso::
+ '''
+ self.log.debug('add_timeout(timeout=%d, cb=%r, opaque=%r)',
+ timeout, cb, opaque)
+ callback = TimeoutCallback(self, cb, opaque)
+ self.callbacks[callback.iden] = callback
+ callback.update(timeout=timeout)
+ return callback.iden
+ def _update_timeout(self, timer, timeout):
+ '''Change frequency for a timer
+ :param int timer: the timer to modify
+ :param int timeout: the new timeout value in ms
+ .. seealso::
+ '''
+ self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
+ return self.callbacks[timer].update(timeout=timeout)
+ def _remove_timeout(self, timer):
+ '''Unregister a callback for a timer
+ :param int timer: the timer to remove
+ :returns: None (see source for explanation)
+ .. seealso::
+ '''
+ self.log.debug('remove_timeout(timer=%d)', timer)
+ callback = self.callbacks.pop(timer)
+ callback.close()
+def virEventRegisterAsyncIOImpl(loop=None):
+ '''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
+ The implementation object is returned, but in normal usage it can safely be
+ discarded.
+ '''
+ return virEventAsyncIOImpl(loop=loop).register()