summaryrefslogtreecommitdiff
path: root/fs/osfs/watch_inotify.py
blob: e33f3c706551d416ac35c9473adc6193210a23f3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""
fs.osfs.watch_inotify
=============

Change watcher support for OSFS, backed by pyinotify.

"""

import os
import sys
import errno
import select
import threading

from fs.errors import *
from fs.path import *
from fs.watch import *

try:
    import pyinotify
except Exception, e:
    #  pyinotify sometimes raises its own custom errors on import.
    #  How on earth are we supposed to catch them when we can't import them?
    if isinstance(e,ImportError):
        raise
    raise ImportError("could not import pyinotify")
try:
    pyinotify.WatchManager.get_fd
except AttributeError:
    raise ImportError("pyinotify version is too old")


class OSFSWatchMixin(WatchableFSMixin):
    """Mixin providing change-watcher support via pyinotify."""

    __watch_lock = threading.Lock()
    __watch_thread = None

    def close(self):
        super(OSFSWatchMixin,self).close()
        self.notify_watchers(CLOSED)
        for watcher_list in self._watchers.values():
            for watcher in watcher_list:
                self.del_watcher(watcher)
        self.__watch_lock.acquire()
        try:
            wt = self.__watch_thread
            if wt is not None and not wt.watchers:
                wt.stop()
                wt.join()
                OSFSWatchMixin.__watch_thread = None
        finally:
            self.__watch_lock.release()

    @convert_os_errors
    def add_watcher(self,callback,path="/",events=None,recursive=True):
        super_add_watcher = super(OSFSWatchMixin,self).add_watcher
        w = super_add_watcher(callback,path,events,recursive)
        w._pyinotify_id = None
        syspath = self.getsyspath(path)
        if isinstance(syspath,unicode):
            syspath = syspath.encode(sys.getfilesystemencoding())
        #  Each watch gets its own WatchManager, since it's tricky to make
        #  a single WatchManager handle multiple callbacks with different
        #  events for a single path.  This means we pay one file descriptor
        #  for each watcher added to the filesystem.  That's not too bad.
        w._pyinotify_WatchManager = wm = pyinotify.WatchManager()
        #  Each individual notifier gets multiplexed by a single shared thread.
        w._pyinotify_Notifier = pyinotify.Notifier(wm)
        evtmask = self.__get_event_mask(events)
        def process_events(event):
            self.__route_event(w,event)
        kwds = dict(rec=recursive,auto_add=recursive,quiet=False)
        try:
            wids = wm.add_watch(syspath,evtmask,process_events,**kwds)
        except pyinotify.WatchManagerError, e:
            raise OperationFailedError("add_watcher",details=e)
        w._pyinotify_id = wids[syspath]
        self.__watch_lock.acquire()
        try:
            wt = self.__get_watch_thread()
            wt.add_watcher(w)
        finally:
            self.__watch_lock.release()
        return w

    @convert_os_errors
    def del_watcher(self,watcher_or_callback):
        if isinstance(watcher_or_callback,Watcher):
            watchers = [watcher_or_callback]
        else:
            watchers = self._find_watchers(watcher_or_callback)
        for watcher in watchers:
            wm = watcher._pyinotify_WatchManager
            wm.rm_watch(watcher._pyinotify_id,rec=watcher.recursive)
            super(OSFSWatchMixin,self).del_watcher(watcher)
        self.__watch_lock.acquire()
        try:
            wt = self.__get_watch_thread()
            for watcher in watchers:
                wt.del_watcher(watcher)
        finally:
            self.__watch_lock.release()

    def __get_event_mask(self,events):
        """Convert the given set of events into a pyinotify event mask."""
        if events is None:
            events = (EVENT,)
        mask = 0
        for evt in events:
            if issubclass(ACCESSED,evt):
                mask |= pyinotify.IN_ACCESS
            if issubclass(CREATED,evt):
                mask |= pyinotify.IN_CREATE
            if issubclass(REMOVED,evt):
                mask |= pyinotify.IN_DELETE
                mask |= pyinotify.IN_DELETE_SELF
            if issubclass(MODIFIED,evt):
                mask |= pyinotify.IN_ATTRIB
                mask |= pyinotify.IN_MODIFY
                mask |= pyinotify.IN_CLOSE_WRITE
            if issubclass(MOVED_SRC,evt):
                mask |= pyinotify.IN_MOVED_FROM
                mask |= pyinotify.IN_MOVED_TO
            if issubclass(MOVED_DST,evt):
                mask |= pyinotify.IN_MOVED_FROM
                mask |= pyinotify.IN_MOVED_TO
            if issubclass(OVERFLOW,evt):
                mask |= pyinotify.IN_Q_OVERFLOW
            if issubclass(CLOSED,evt):
                mask |= pyinotify.IN_UNMOUNT
        return mask

    def __route_event(self,watcher,inevt):
        """Convert pyinotify event into fs.watch event, then handle it."""
        try:
            path = self.unsyspath(inevt.pathname)
        except ValueError:
            return
        try:
            src_path = inevt.src_pathname
            if src_path is not None:
                src_path = self.unsyspath(src_path)
        except (AttributeError,ValueError):
            src_path = None
        if inevt.mask & pyinotify.IN_ACCESS:
            watcher.handle_event(ACCESSED(self,path))
        if inevt.mask & pyinotify.IN_CREATE:
            watcher.handle_event(CREATED(self,path))
            #  Recursive watching of directories in pyinotify requires
            #  the creation of a new watch for each subdir, resulting in
            #  a race condition whereby events in the subdir are missed.
            #  We'd prefer to duplicate events than to miss them.
            if inevt.mask & pyinotify.IN_ISDIR:
                try:
                    #  pyinotify does this for dirs itself, we only.
                    #  need to worry about newly-created files.
                    for child in self.listdir(path,files_only=True):
                        cpath = pathjoin(path,child)
                        self.notify_watchers(CREATED,cpath)
                        self.notify_watchers(MODIFIED,cpath,True)
                except FSError:
                    pass
        if inevt.mask & pyinotify.IN_DELETE:
            watcher.handle_event(REMOVED(self,path))
        if inevt.mask & pyinotify.IN_DELETE_SELF:
            watcher.handle_event(REMOVED(self,path))
        if inevt.mask & pyinotify.IN_ATTRIB:
            watcher.handle_event(MODIFIED(self,path,False))
        if inevt.mask & pyinotify.IN_MODIFY:
            watcher.handle_event(MODIFIED(self,path,True))
        if inevt.mask & pyinotify.IN_CLOSE_WRITE:
            watcher.handle_event(MODIFIED(self,path,True, closed=True))
        if inevt.mask & pyinotify.IN_MOVED_FROM:
            # Sorry folks, I'm not up for decoding the destination path.
            watcher.handle_event(MOVED_SRC(self,path,None))
        if inevt.mask & pyinotify.IN_MOVED_TO:
            if getattr(inevt,"src_pathname",None):
                watcher.handle_event(MOVED_SRC(self,src_path,path))
                watcher.handle_event(MOVED_DST(self,path,src_path))
            else:
                watcher.handle_event(MOVED_DST(self,path,None))
        if inevt.mask & pyinotify.IN_Q_OVERFLOW:
            watcher.handle_event(OVERFLOW(self))
        if inevt.mask & pyinotify.IN_UNMOUNT:
            watcher.handle_event(CLOSE(self))

    def __get_watch_thread(self):
        """Get the shared watch thread, initializing if necessary.

        This method must only be called while holding self.__watch_lock, or
        multiple notifiers could be created.
        """
        if OSFSWatchMixin.__watch_thread is None:
            OSFSWatchMixin.__watch_thread = SharedThreadedNotifier()
            OSFSWatchMixin.__watch_thread.start()
        return OSFSWatchMixin.__watch_thread


class SharedThreadedNotifier(threading.Thread):
    """pyinotifer Notifier that can manage multiple WatchManagers.

    Each watcher added to an OSFS corresponds to a new pyinotify.WatchManager
    instance.  Rather than run a notifier thread for each manager, we run a
    single thread that multiplexes between them all.
    """

    def __init__(self):
        super(SharedThreadedNotifier,self).__init__()
        self.daemon = True
        self.running = True
        self._pipe_r, self._pipe_w = os.pipe()
        self._poller = select.poll()
        self._poller.register(self._pipe_r,select.POLLIN)
        self.watchers = {}

    def add_watcher(self,watcher):
        fd = watcher._pyinotify_WatchManager.get_fd()
        self.watchers[fd] = watcher
        self._poller.register(fd,select.POLLIN)
        #  Bump the poll object so it recognises the new fd.
        os.write(self._pipe_w,b"H")

    def del_watcher(self,watcher):
        fd = watcher._pyinotify_WatchManager.get_fd()
        try:
            del self.watchers[fd]
        except KeyError:
            pass
        else:
            self._poller.unregister(fd)

    def run(self):
        #  Grab some attributes of the select module, so they're available
        #  even when shutting down the interpreter.
        _select_error = select.error
        _select_POLLIN = select.POLLIN
        #  Loop until stopped, dispatching to individual notifiers.
        while self.running:
            try:
                ready_fds = self._poller.poll()
            except _select_error, e:
                if e[0] != errno.EINTR:
                    raise
            else:
                for (fd,event) in ready_fds:
                    #  Ignore all events other than "input ready".
                    if not event & _select_POLLIN:
                        continue
                    #  For signals on our internal pipe, just read and discard.
                    if fd == self._pipe_r:
                        os.read(self._pipe_r,1)
                    #  For notifier fds, dispath to the notifier methods.
                    else:
                        try:
                            notifier = self.watchers[fd]._pyinotify_Notifier
                        except KeyError:
                            pass
                        else:
                            notifier.read_events()
                            try:
                                notifier.process_events()
                            except EnvironmentError:
                                pass

    def stop(self):
        if self.running:
            self.running = False
            os.write(self._pipe_w,"S")
            os.close(self._pipe_w)