summaryrefslogtreecommitdiff
path: root/glance/store/scrubber.py
diff options
context:
space:
mode:
Diffstat (limited to 'glance/store/scrubber.py')
-rw-r--r--glance/store/scrubber.py523
1 files changed, 523 insertions, 0 deletions
diff --git a/glance/store/scrubber.py b/glance/store/scrubber.py
new file mode 100644
index 0000000..547a07a
--- /dev/null
+++ b/glance/store/scrubber.py
@@ -0,0 +1,523 @@
+# Copyright 2010 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import calendar
+import eventlet
+import os
+import time
+
+from oslo.config import cfg
+
+from glance.common import crypt
+from glance.common import exception
+from glance.common import utils
+from glance import context
+from glance.openstack.common import lockutils
+import glance.openstack.common.log as logging
+import glance.registry.client.v1.api as registry
+
+LOG = logging.getLogger(__name__)
+
+scrubber_opts = [
+ cfg.StrOpt('scrubber_datadir',
+ default='/var/lib/glance/scrubber',
+ help=_('Directory that the scrubber will use to track '
+ 'information about what to delete. '
+ 'Make sure this is set in glance-api.conf and '
+ 'glance-scrubber.conf')),
+ cfg.IntOpt('scrub_time', default=0,
+ help=_('The amount of time in seconds to delay before '
+ 'performing a delete.')),
+ cfg.BoolOpt('cleanup_scrubber', default=False,
+ help=_('A boolean that determines if the scrubber should '
+ 'clean up the files it uses for taking data. Only '
+ 'one server in your deployment should be designated '
+ 'the cleanup host.')),
+ cfg.IntOpt('cleanup_scrubber_time', default=86400,
+ help=_('Items must have a modified time that is older than '
+ 'this value in order to be candidates for cleanup.'))
+]
+
+CONF = cfg.CONF
+CONF.register_opts(scrubber_opts)
+CONF.import_opt('metadata_encryption_key', 'glance.common.config')
+
+
+class ScrubQueue(object):
+ """Image scrub queue base class.
+
+ The queue contains image's location which need to delete from backend.
+ """
+ def __init__(self):
+ registry.configure_registry_client()
+ registry.configure_registry_admin_creds()
+ self.registry = registry.get_registry_client(context.RequestContext())
+
+ @abc.abstractmethod
+ def add_location(self, image_id, uri):
+ """Adding image location to scrub queue.
+
+ :param image_id: The opaque image identifier
+ :param uri: The opaque image location uri
+ """
+ pass
+
+ @abc.abstractmethod
+ def get_all_locations(self):
+ """Returns a list of image id and location tuple from scrub queue.
+
+ :retval a list of image id and location tuple from scrub queue
+ """
+ pass
+
+ @abc.abstractmethod
+ def pop_all_locations(self):
+ """Pop out a list of image id and location tuple from scrub queue.
+
+ :retval a list of image id and location tuple from scrub queue
+ """
+ pass
+
+ @abc.abstractmethod
+ def has_image(self, image_id):
+ """Returns whether the queue contains an image or not.
+ :param image_id: The opaque image identifier
+
+ :retval a boolean value to inform including or not
+ """
+ pass
+
+
+class ScrubFileQueue(ScrubQueue):
+ """File-based image scrub queue class."""
+ def __init__(self):
+ super(ScrubFileQueue, self).__init__()
+ self.scrubber_datadir = CONF.scrubber_datadir
+ utils.safe_mkdirs(self.scrubber_datadir)
+ self.scrub_time = CONF.scrub_time
+ self.metadata_encryption_key = CONF.metadata_encryption_key
+
+ def _read_queue_file(self, file_path):
+ """Reading queue file to loading deleted location and timestamp out.
+
+ :param file_path: Queue file full path
+
+ :retval a list of image location timestamp tuple from queue file
+ """
+ uris = []
+ delete_times = []
+
+ try:
+ with open(file_path, 'r') as f:
+ while True:
+ uri = f.readline().strip()
+ if uri:
+ uris.append(uri)
+ delete_times.append(int(f.readline().strip()))
+ else:
+ break
+ except Exception:
+ LOG.error(_("%s file can not be read.") % file_path)
+
+ return uris, delete_times
+
+ def _update_queue_file(self, file_path, remove_record_idxs):
+ """Updating queue file to remove such queue records.
+
+ :param file_path: Queue file full path
+ :param remove_record_idxs: A list of record index those want to remove
+ """
+ try:
+ with open(file_path, 'r') as f:
+ lines = f.readlines()
+ # NOTE(zhiyan) we need bottom up removing to
+ # keep record index be valid.
+ remove_record_idxs.sort(reverse=True)
+ for record_idx in remove_record_idxs:
+ # Each record has two lines
+ line_no = (record_idx + 1) * 2 - 1
+ del lines[line_no:line_no + 2]
+ with open(file_path, 'w') as f:
+ f.write(''.join(lines))
+ os.chmod(file_path, 0o600)
+ except Exception:
+ LOG.error(_("%s file can not be wrote.") % file_path)
+
+ def add_location(self, image_id, uri):
+ """Adding image location to scrub queue.
+
+ :param image_id: The opaque image identifier
+ :param uri: The opaque image location uri
+ """
+ with lockutils.lock("scrubber-%s" % image_id,
+ lock_file_prefix='glance-', external=True):
+
+ # NOTE(zhiyan): make sure scrubber does not cleanup
+ # 'pending_delete' images concurrently before the code
+ # get lock and reach here.
+ try:
+ image = self.registry.get_image(image_id)
+ if image['status'] == 'deleted':
+ return
+ except exception.NotFound as e:
+ LOG.error(_("Failed to find image to delete: "
+ "%(e)s"), {'e': e})
+ return
+
+ delete_time = time.time() + self.scrub_time
+ file_path = os.path.join(self.scrubber_datadir, str(image_id))
+
+ if self.metadata_encryption_key is not None:
+ uri = crypt.urlsafe_encrypt(self.metadata_encryption_key,
+ uri, 64)
+
+ if os.path.exists(file_path):
+ # Append the uri of location to the queue file
+ with open(file_path, 'a') as f:
+ f.write('\n')
+ f.write('\n'.join([uri, str(int(delete_time))]))
+ else:
+ # NOTE(zhiyan): Protect the file before we write any data.
+ open(file_path, 'w').close()
+ os.chmod(file_path, 0o600)
+ with open(file_path, 'w') as f:
+ f.write('\n'.join([uri, str(int(delete_time))]))
+ os.utime(file_path, (delete_time, delete_time))
+
+ def _walk_all_locations(self, remove=False):
+ """Returns a list of image id and location tuple from scrub queue.
+
+ :param remove: Whether remove location from queue or not after walk
+
+ :retval a list of image image_id and location tuple from scrub queue
+ """
+ if not os.path.exists(self.scrubber_datadir):
+ LOG.info(_("%s directory does not exist.") % self.scrubber_datadir)
+ return []
+
+ ret = []
+ for root, dirs, files in os.walk(self.scrubber_datadir):
+ for image_id in files:
+ if not utils.is_uuid_like(image_id):
+ continue
+ with lockutils.lock("scrubber-%s" % image_id,
+ lock_file_prefix='glance-', external=True):
+ file_path = os.path.join(self.scrubber_datadir, image_id)
+ uris, delete_times = self._read_queue_file(file_path)
+
+ remove_record_idxs = []
+ skipped = False
+ for (record_idx, delete_time) in enumerate(delete_times):
+ if delete_time > time.time():
+ skipped = True
+ continue
+ else:
+ ret.append((image_id, uris[record_idx]))
+ remove_record_idxs.append(record_idx)
+ if remove:
+ if skipped:
+ # NOTE(zhiyan): remove location records from
+ # the queue file.
+ self._update_queue_file(file_path,
+ remove_record_idxs)
+ else:
+ utils.safe_remove(file_path)
+ return ret
+
+ def get_all_locations(self):
+ """Returns a list of image id and location tuple from scrub queue.
+
+ :retval a list of image id and location tuple from scrub queue
+ """
+ return self._walk_all_locations()
+
+ def pop_all_locations(self):
+ """Pop out a list of image id and location tuple from scrub queue.
+
+ :retval a list of image id and location tuple from scrub queue
+ """
+ return self._walk_all_locations(remove=True)
+
+ def has_image(self, image_id):
+ """Returns whether the queue contains an image or not.
+
+ :param image_id: The opaque image identifier
+
+ :retval a boolean value to inform including or not
+ """
+ return os.path.exists(os.path.join(self.scrubber_datadir,
+ str(image_id)))
+
+
+class ScrubDBQueue(ScrubQueue):
+ """Database-based image scrub queue class."""
+ def __init__(self):
+ super(ScrubDBQueue, self).__init__()
+ self.cleanup_scrubber_time = CONF.cleanup_scrubber_time
+
+ def add_location(self, image_id, uri):
+ """Adding image location to scrub queue.
+
+ :param image_id: The opaque image identifier
+ :param uri: The opaque image location uri
+ """
+ raise NotImplementedError
+
+ def _walk_all_locations(self, remove=False):
+ """Returns a list of image id and location tuple from scrub queue.
+
+ :param remove: Whether remove location from queue or not after walk
+
+ :retval a list of image id and location tuple from scrub queue
+ """
+ filters = {'deleted': True,
+ 'is_public': 'none',
+ 'status': 'pending_delete'}
+ ret = []
+ for image in self.registry.get_images_detailed(filters=filters):
+ deleted_at = image.get('deleted_at')
+ if not deleted_at:
+ continue
+
+ # NOTE: Strip off microseconds which may occur after the last '.,'
+ # Example: 2012-07-07T19:14:34.974216
+ date_str = deleted_at.rsplit('.', 1)[0].rsplit(',', 1)[0]
+ delete_time = calendar.timegm(time.strptime(date_str,
+ "%Y-%m-%dT%H:%M:%S"))
+
+ if delete_time + self.cleanup_scrubber_time > time.time():
+ continue
+
+ ret.extend([(image['id'], location['uri'])
+ for location in image['location_data']])
+
+ if remove:
+ self.registry.update_image(image['id'], {'status': 'deleted'})
+ return ret
+
+ def get_all_locations(self):
+ """Returns a list of image id and location tuple from scrub queue.
+
+ :retval a list of image id and location tuple from scrub queue
+ """
+ return self._walk_all_locations()
+
+ def pop_all_locations(self):
+ """Pop out a list of image id and location tuple from scrub queue.
+
+ :retval a list of image id and location tuple from scrub queue
+ """
+ return self._walk_all_locations(remove=True)
+
+ def has_image(self, image_id):
+ """Returns whether the queue contains an image or not.
+
+ :param image_id: The opaque image identifier
+
+ :retval a boolean value to inform including or not
+ """
+ try:
+ image = self.registry.get_image(image_id)
+ return image['status'] == 'pending_delete'
+ except exception.NotFound as e:
+ return False
+
+
+_file_queue = None
+_db_queue = None
+
+
+def get_scrub_queues():
+ global _file_queue, _db_queue
+ if not _file_queue:
+ _file_queue = ScrubFileQueue()
+ if not _db_queue:
+ _db_queue = ScrubDBQueue()
+ return (_file_queue, _db_queue)
+
+
+class Daemon(object):
+ def __init__(self, wakeup_time=300, threads=1000):
+ LOG.info(_("Starting Daemon: wakeup_time=%(wakeup_time)s "
+ "threads=%(threads)s"),
+ {'wakeup_time': wakeup_time, 'threads': threads})
+ self.wakeup_time = wakeup_time
+ self.event = eventlet.event.Event()
+ self.pool = eventlet.greenpool.GreenPool(threads)
+
+ def start(self, application):
+ self._run(application)
+
+ def wait(self):
+ try:
+ self.event.wait()
+ except KeyboardInterrupt:
+ msg = _("Daemon Shutdown on KeyboardInterrupt")
+ LOG.info(msg)
+
+ def _run(self, application):
+ LOG.debug(_("Running application"))
+ self.pool.spawn_n(application.run, self.pool, self.event)
+ eventlet.spawn_after(self.wakeup_time, self._run, application)
+ LOG.debug(_("Next run scheduled in %s seconds") % self.wakeup_time)
+
+
+class Scrubber(object):
+ def __init__(self, store_api):
+ LOG.info(_("Initializing scrubber with configuration: %s") %
+ unicode({'scrubber_datadir': CONF.scrubber_datadir,
+ 'cleanup': CONF.cleanup_scrubber,
+ 'cleanup_time': CONF.cleanup_scrubber_time,
+ 'registry_host': CONF.registry_host,
+ 'registry_port': CONF.registry_port}))
+
+ utils.safe_mkdirs(CONF.scrubber_datadir)
+
+ self.store_api = store_api
+
+ registry.configure_registry_client()
+ registry.configure_registry_admin_creds()
+ self.registry = registry.get_registry_client(context.RequestContext())
+
+ (self.file_queue, self.db_queue) = get_scrub_queues()
+
+ def _get_delete_jobs(self, queue, pop):
+ try:
+ if pop:
+ image_id_uri_list = queue.pop_all_locations()
+ else:
+ image_id_uri_list = queue.get_all_locations()
+ except Exception:
+ LOG.error(_("Can not %s scrub jobs from queue.") %
+ 'pop' if pop else 'get')
+ return None
+
+ delete_jobs = {}
+ for image_id, image_uri in image_id_uri_list:
+ if image_id not in delete_jobs:
+ delete_jobs[image_id] = []
+ delete_jobs[image_id].append((image_id, image_uri))
+ return delete_jobs
+
+ def run(self, pool, event=None):
+ delete_jobs = self._get_delete_jobs(self.file_queue, True)
+ if delete_jobs:
+ for image_id, jobs in delete_jobs.iteritems():
+ self._scrub_image(pool, image_id, jobs)
+
+ if CONF.cleanup_scrubber:
+ self._cleanup(pool)
+
+ def _scrub_image(self, pool, image_id, delete_jobs):
+ if len(delete_jobs) == 0:
+ return
+
+ LOG.info(_("Scrubbing image %(id)s from %(count)d locations.") %
+ {'id': image_id, 'count': len(delete_jobs)})
+ # NOTE(bourke): The starmap must be iterated to do work
+ list(pool.starmap(self._delete_image_from_backend, delete_jobs))
+
+ image = self.registry.get_image(image_id)
+ if (image['status'] == 'pending_delete' and
+ not self.file_queue.has_image(image_id)):
+ self.registry.update_image(image_id, {'status': 'deleted'})
+
+ def _delete_image_from_backend(self, image_id, uri):
+ if CONF.metadata_encryption_key is not None:
+ uri = crypt.urlsafe_decrypt(CONF.metadata_encryption_key, uri)
+
+ try:
+ LOG.debug(_("Deleting URI from image %(image_id)s.") %
+ {'image_id': image_id})
+
+ # Here we create a request context with credentials to support
+ # delayed delete when using multi-tenant backend storage
+ admin_tenant = CONF.admin_tenant_name
+ auth_token = self.registry.auth_tok
+ admin_context = context.RequestContext(user=CONF.admin_user,
+ tenant=admin_tenant,
+ auth_tok=auth_token)
+
+ self.store_api.delete_from_backend(admin_context, uri)
+ except Exception:
+ msg = _("Failed to delete URI from image %(image_id)s")
+ LOG.error(msg % {'image_id': image_id})
+
+ def _read_cleanup_file(self, file_path):
+ """Reading cleanup to get latest cleanup timestamp.
+
+ :param file_path: Cleanup status file full path
+
+ :retval latest cleanup timestamp
+ """
+ try:
+ if not os.path.exists(file_path):
+ msg = _("%s file is not exists.") % unicode(file_path)
+ raise Exception(msg)
+ atime = int(os.path.getatime(file_path))
+ mtime = int(os.path.getmtime(file_path))
+ if atime != mtime:
+ msg = _("%s file contains conflicting cleanup "
+ "timestamp.") % unicode(file_path)
+ raise Exception(msg)
+ return atime
+ except Exception as e:
+ LOG.error(e)
+ return None
+
+ def _update_cleanup_file(self, file_path, cleanup_time):
+ """Update latest cleanup timestamp to cleanup file.
+
+ :param file_path: Cleanup status file full path
+ :param cleanup_time: The Latest cleanup timestamp
+ """
+ try:
+ open(file_path, 'w').close()
+ os.chmod(file_path, 0o600)
+ os.utime(file_path, (cleanup_time, cleanup_time))
+ except Exception:
+ LOG.error(_("%s file can not be created.") % unicode(file_path))
+
+ def _cleanup(self, pool):
+ now = time.time()
+ cleanup_file = os.path.join(CONF.scrubber_datadir, ".cleanup")
+ if not os.path.exists(cleanup_file):
+ self._update_cleanup_file(cleanup_file, now)
+ return
+
+ last_cleanup_time = self._read_cleanup_file(cleanup_file)
+ cleanup_time = last_cleanup_time + CONF.cleanup_scrubber_time
+ if cleanup_time > now:
+ return
+
+ LOG.info(_("Getting images deleted before "
+ "%s") % CONF.cleanup_scrubber_time)
+ self._update_cleanup_file(cleanup_file, now)
+
+ delete_jobs = self._get_delete_jobs(self.db_queue, False)
+ if not delete_jobs:
+ return
+
+ for image_id, jobs in delete_jobs.iteritems():
+ with lockutils.lock("scrubber-%s" % image_id,
+ lock_file_prefix='glance-', external=True):
+ if not self.file_queue.has_image(image_id):
+ # NOTE(zhiyan): scrubber should not cleanup this image
+ # since a queue file be created for this 'pending_delete'
+ # image concurrently before the code get lock and
+ # reach here. The checking only be worth if glance-api and
+ # glance-scrubber service be deployed on a same host.
+ self._scrub_image(pool, image_id, jobs)