summaryrefslogtreecommitdiff
path: root/taskflow/persistence/backends/impl_dir.py
blob: b6d1a27bf2c3a3a9b46173ce3fd6d97b7de7aa0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# -*- coding: utf-8 -*-

#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#    Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import contextlib
import errno
import os
import shutil

import cachetools
import fasteners
from oslo_serialization import jsonutils

from taskflow import exceptions as exc
from taskflow.persistence import path_based
from taskflow.utils import misc


@contextlib.contextmanager
def _storagefailure_wrapper():
    try:
        yield
    except exc.TaskFlowException:
        raise
    except Exception as e:
        if isinstance(e, (IOError, OSError)) and e.errno == errno.ENOENT:
            exc.raise_with_cause(exc.NotFound,
                                 'Item not found: %s' % e.filename,
                                 cause=e)
        else:
            exc.raise_with_cause(exc.StorageFailure,
                                 "Storage backend internal error", cause=e)


class DirBackend(path_based.PathBasedBackend):
    """A directory and file based backend.

    This backend does *not* provide true transactional semantics. It does
    guarantee that there will be no interprocess race conditions when
    writing and reading by using a consistent hierarchy of file based locks.

    Example configuration::

        conf = {
            "path": "/tmp/taskflow",  # save data to this root directory
            "max_cache_size": 1024,  # keep up-to 1024 entries in memory
        }
    """

    def __init__(self, conf):
        super(DirBackend, self).__init__(conf)
        max_cache_size = self._conf.get('max_cache_size')
        if max_cache_size is not None:
            max_cache_size = int(max_cache_size)
            if max_cache_size < 1:
                raise ValueError("Maximum cache size must be greater than"
                                 " or equal to one")
            self.file_cache = cachetools.LRUCache(max_cache_size)
        else:
            self.file_cache = {}
        self.encoding = self._conf.get('encoding', 'utf-8')
        if not self._path:
            raise ValueError("Empty path is disallowed")
        self._path = os.path.abspath(self._path)
        self.lock = fasteners.ReaderWriterLock()

    def get_connection(self):
        return Connection(self)

    def close(self):
        pass


class Connection(path_based.PathBasedConnection):
    def _read_from(self, filename):
        # This is very similar to the oslo-incubator fileutils module, but
        # tweaked to not depend on a global cache, as well as tweaked to not
        # pull-in the oslo logging module (which is a huge pile of code).
        mtime = os.path.getmtime(filename)
        cache_info = self.backend.file_cache.setdefault(filename, {})
        if not cache_info or mtime > cache_info.get('mtime', 0):
            with open(filename, 'rb') as fp:
                cache_info['data'] = misc.binary_decode(
                    fp.read(), encoding=self.backend.encoding)
                cache_info['mtime'] = mtime
        return cache_info['data']

    def _write_to(self, filename, contents):
        contents = misc.binary_encode(contents,
                                      encoding=self.backend.encoding)
        with open(filename, 'wb') as fp:
            fp.write(contents)
        self.backend.file_cache.pop(filename, None)

    @contextlib.contextmanager
    def _path_lock(self, path):
        lockfile = self._join_path(path, 'lock')
        with fasteners.InterProcessLock(lockfile) as lock:
            with _storagefailure_wrapper():
                yield lock

    def _join_path(self, *parts):
        return os.path.join(*parts)

    def _get_item(self, path):
        with self._path_lock(path):
            item_path = self._join_path(path, 'metadata')
            return misc.decode_json(self._read_from(item_path))

    def _set_item(self, path, value, transaction):
        with self._path_lock(path):
            item_path = self._join_path(path, 'metadata')
            self._write_to(item_path, jsonutils.dumps(value))

    def _del_tree(self, path, transaction):
        with self._path_lock(path):
            shutil.rmtree(path)

    def _get_children(self, path):
        with _storagefailure_wrapper():
            return [link for link in os.listdir(path)
                    if os.path.islink(self._join_path(path, link))]

    def _ensure_path(self, path):
        with _storagefailure_wrapper():
            misc.ensure_tree(path)

    def _create_link(self, src_path, dest_path, transaction):
        with _storagefailure_wrapper():
            try:
                os.symlink(src_path, dest_path)
            except OSError as e:
                if e.errno != errno.EEXIST:
                    raise

    @contextlib.contextmanager
    def _transaction(self):
        """This just wraps a global write-lock."""
        lock = self.backend.lock.write_lock
        with lock():
            yield

    def validate(self):
        with _storagefailure_wrapper():
            for p in (self.flow_path, self.atom_path, self.book_path):
                if not os.path.isdir(p):
                    raise RuntimeError("Missing required directory: %s" % (p))