# Copyright (C) 2014-2018,2020 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import json import logging import os import sqlite3 import time import yoyo import lorrycontroller class LorryNotFoundError(Exception): def __init__(self, path): Exception.__init__( self, 'Lorry with path %r not found in STATEDB' % path) class WrongNumberLorriesRunningJob(Exception): def __init__(self, job_id, row_count): Exception.__init__( self, 'STATEDB has %d Lorry specs running job %r, should be 1' % (row_count, job_id)) class HostNotFoundError(Exception): def __init__(self, host): Exception.__init__( self, 'Host %s not known in STATEDB' % host) class StateDB(object): '''A wrapper around raw Sqlite for STATEDB.''' def __init__(self, filename): logging.debug('Creating StateDB instance for %r', filename) self._filename = filename self._conn = None self._transaction_started = None self.initial_lorries_fields = [ ('path', 'TEXT PRIMARY KEY', None), ('text', 'TEXT', None), ('from_trovehost', 'TEXT', 'from_host'), ('from_path', 'TEXT', None), ('running_job', 'INT', None), ('last_run', 'INT', None), ('interval', 'INT', None), ('lorry_timeout', 'INT', None), ('disk_usage', 'INT', None), ] self.lorries_fields = list(self.initial_lorries_fields) self.lorries_fields.extend([ ('last_run_exit', 'TEXT', None), ('last_run_error', 'TEXT', None), ]) self.lorries_booleans = [ ] def _open(self): if self._conn is None: db_exists = os.path.exists(self._filename) assert db_exists self._create_or_connect_to_db() def _create_or_connect_to_db(self): logging.debug( 'Connecting to %r', self._filename) self._conn = sqlite3.connect( self._filename, timeout=100000, isolation_level="IMMEDIATE") logging.debug('New connection is %r', self._conn) def initialise_db(self): db_exists = os.path.exists(self._filename) if self._conn is None: self._create_or_connect_to_db() if not db_exists: self._initialise_tables() self._perform_any_migrations() def _perform_any_migrations(self): logging.debug('Performing database migrations needed') backend = yoyo.get_backend('sqlite:///' + self._filename) migrations_dir = os.path.join(os.path.dirname(__file__), 'migrations') migrations = yoyo.read_migrations(migrations_dir) backend.apply_migrations(backend.to_apply(migrations)) logging.debug('Database migrated') def _initialise_tables(self): logging.debug('Initialising tables in database') c = self._conn.cursor() # Note that this creates the *original* schema, which will # then be updated by the migrations (_perform_any_migrations # above). Since we did not use yoyo originally, this can't # be moved to a migration. # Table for holding the "are we scheduling jobs" value. c.execute('CREATE TABLE running_queue (running INT)') c.execute('INSERT INTO running_queue VALUES (1)') # Table for known remote Hosts. c.execute( 'CREATE TABLE troves (' 'trovehost TEXT PRIMARY KEY, ' 'protocol TEXT, ' 'username TEXT, ' 'password TEXT, ' 'lorry_interval INT, ' 'lorry_timeout INT, ' 'ls_interval INT, ' 'ls_last_run INT, ' 'prefixmap TEXT, ' 'ignore TEXT ' ')') # Table for all the known lorries (the "run queue"). fields_sql = ', '.join( '%s %s' % (column, info) for column, info, key in self.initial_lorries_fields ) c.execute('CREATE TABLE lorries (%s)' % fields_sql) # Table for the next available job id. c.execute('CREATE TABLE next_job_id (job_id INT)') c.execute('INSERT INTO next_job_id VALUES (1)') # Table of all jobs (running or not), and their info. c.execute( 'CREATE TABLE jobs (' 'job_id INT PRIMARY KEY, ' 'host TEXT, ' 'pid INT, ' 'started INT, ' 'ended INT, ' 'updated INT, ' 'kill INT, ' 'path TEXT, ' 'exit TEXT, ' 'disk_usage INT, ' 'output TEXT)') # Table for holding max number of jobs running at once. If no # rows, there is no limit. Otherwise, there is exactly one # row. c.execute('CREATE TABLE max_jobs (max_jobs INT)') # A table to give the current pretended time, if one is set. # This table is either empty, in which case time.time() is # used, or has one row, which is used for the current time. c.execute('CREATE TABLE time (now INT)') # Stupid table we can always write to to trigger the start of # a transaction. c.execute('CREATE TABLE stupid (value INT)') # Done. self._conn.commit() logging.debug('Finished initialising tables in STATEDB') @property def in_transaction(self): return self._transaction_started is not None def __enter__(self): logging.debug('Entering context manager (%r)', self) assert not self.in_transaction self._transaction_started = time.time() self._open() c = self._conn.cursor() c.execute('INSERT INTO stupid VALUES (1)') return self def __exit__(self, exc_type, exc_val, exc_tb): logging.debug('Exiting context manager (%r)', self) assert self.in_transaction if exc_type is None: logging.debug( 'Committing transaction in __exit__ (%r)', self._conn) c = self._conn.cursor() c.execute('DELETE FROM stupid') self._conn.commit() else: logging.error( 'Rolling back transaction in __exit__ (%r)', self._conn, exc_info=(exc_type, exc_val, exc_tb)) self._conn.rollback() self._conn.close() self._conn = None logging.debug( 'Transaction duration: %r', time.time() - self._transaction_started) self._transaction_started = None return False def get_cursor(self): '''Return a new cursor.''' self._open() return self._conn.cursor() def get_running_queue(self): c = self.get_cursor() for (running,) in c.execute('SELECT running FROM running_queue'): return bool(running) def set_running_queue(self, new_status): logging.debug('StateDB.set_running_queue(%r) called', new_status) assert self.in_transaction if new_status: new_value = 1 else: new_value = 0 self.get_cursor().execute( 'UPDATE running_queue SET running = ?', str(new_value)) def get_host_info(self, host): c = self.get_cursor() c.execute( 'SELECT protocol, username, password, type, type_params, ' 'lorry_interval, lorry_timeout, ls_interval, ls_last_run, ' 'prefixmap, ignore ' 'FROM hosts WHERE host IS ?', (host,)) row = c.fetchone() if row is None: raise lorrycontroller.HostNotFoundError(host) return { 'host': host, 'protocol': row[0], 'username': row[1], 'password': row[2], 'type': row[3], 'type_params': json.loads(row[4]), 'lorry_interval': row[5], 'lorry_timeout': row[6], 'ls_interval': row[7], 'ls_last_run': row[8], 'prefixmap': row[9], 'ignore': row[10], } def add_host(self, host=None, protocol=None, username=None, password=None, host_type=None, type_params={}, lorry_interval=None, lorry_timeout=None, ls_interval=None, prefixmap=None, ignore=None): logging.debug( 'StateDB.add_host(%r,%r,%r,%r,%r,%r) called', host, lorry_interval, lorry_timeout, ls_interval, prefixmap, ignore) assert host is not None assert protocol is not None assert host_type is not None assert isinstance(type_params, dict) assert lorry_interval is not None assert lorry_timeout is not None assert ls_interval is not None assert prefixmap is not None assert ignore is not None assert self.in_transaction type_params = json.dumps(type_params) try: self.get_host_info(host) except lorrycontroller.HostNotFoundError: c = self.get_cursor() c.execute( 'INSERT INTO hosts ' '(host, protocol, username, password, type, type_params, ' 'lorry_interval, lorry_timeout, ' 'ls_interval, ls_last_run, ' 'prefixmap, ignore) ' 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (host, protocol, username, password, host_type, type_params, lorry_interval, lorry_timeout, ls_interval, 0, prefixmap, ignore)) else: c = self.get_cursor() c.execute( 'UPDATE hosts ' 'SET lorry_interval=?, lorry_timeout=?, ls_interval=?, ' 'prefixmap=?, ignore=?, protocol=?, type_params=? ' 'WHERE host IS ?', (lorry_interval, lorry_timeout, ls_interval, prefixmap, ignore, protocol, type_params, host)) def remove_host(self, host): logging.debug('StateDB.remove_host(%r) called', host) assert self.in_transaction c = self.get_cursor() c.execute('DELETE FROM hosts WHERE host=?', (host,)) def get_hosts(self): c = self.get_cursor() c.execute('SELECT host FROM hosts') return [row[0] for row in c.fetchall()] def set_host_ls_last_run(self, host, ls_last_run): logging.debug( 'StateDB.set_host_ls_last_run(%r,%r) called', host, ls_last_run) assert self.in_transaction c = self.get_cursor() c.execute( 'UPDATE hosts SET ls_last_run=? WHERE host=?', (ls_last_run, host)) def make_lorry_info_from_row(self, row): result = dict( (key or column, row[i]) for i, (column, info, key) in enumerate(self.lorries_fields)) for field in self.lorries_booleans: result[field] = bool(result[field]) return result def get_lorry_info(self, path): c = self.get_cursor() c.execute('SELECT * FROM lorries WHERE path IS ?', (path,)) row = c.fetchone() if row is None: raise lorrycontroller.LorryNotFoundError(path) return self.make_lorry_info_from_row(row) def get_all_lorries_info(self): c = self.get_cursor() c.execute('SELECT * FROM lorries ORDER BY (last_run + interval)') return [self.make_lorry_info_from_row(row) for row in c.fetchall()] def get_lorries_paths(self): c = self.get_cursor() return [ row[0] for row in c.execute( 'SELECT path FROM lorries ORDER BY (last_run + interval)')] def get_lorries_for_host(self, host): c = self.get_cursor() c.execute( 'SELECT path FROM lorries WHERE from_trovehost IS ?', (host,)) return [row[0] for row in c.fetchall()] def add_to_lorries(self, path=None, text=None, from_host=None, from_path=None, interval=None, timeout=None): logging.debug( 'StateDB.add_to_lorries(' 'path=%r, text=%r, from_host=%r, interval=%s, ' 'timeout=%r called', path, text, from_host, interval, timeout) assert path is not None assert text is not None assert from_host is not None assert from_path is not None assert interval is not None assert timeout is not None assert self.in_transaction try: self.get_lorry_info(path) except lorrycontroller.LorryNotFoundError: c = self.get_cursor() c.execute( 'INSERT INTO lorries ' '(path, text, from_trovehost, from_path, last_run, interval, ' 'lorry_timeout, running_job) ' 'VALUES (?, ?, ?, ?, ?, ?, ?, ?)', (path, text, from_host, from_path, 0, interval, timeout, None)) else: c = self.get_cursor() c.execute( 'UPDATE lorries ' 'SET text=?, from_trovehost=?, from_path=?, interval=?, ' 'lorry_timeout=? ' 'WHERE path IS ?', (text, from_host, from_path, interval, timeout, path)) def remove_lorry(self, path): logging.debug('StateDB.remove_lorry(%r) called', path) assert self.in_transaction c = self.get_cursor() c.execute('DELETE FROM lorries WHERE path IS ?', (path,)) def remove_lorries_for_host(self, host): logging.debug( 'StateDB.remove_lorries_for_host(%r) called', host) assert self.in_transaction c = self.get_cursor() c.execute('DELETE FROM lorries WHERE from_trovehost IS ?', (host,)) def set_running_job(self, path, job_id): logging.debug( 'StateDB.set_running_job(%r, %r) called', path, job_id) assert self.in_transaction c = self.get_cursor() c.execute( 'UPDATE lorries SET running_job=? WHERE path=?', (job_id, path)) def find_lorry_running_job(self, job_id): c = self.get_cursor() c.execute( 'SELECT path FROM lorries WHERE running_job IS ?', (job_id,)) rows = c.fetchall() if len(rows) != 1: raise lorrycontroller.WrongNumberLorriesRunningJob(job_id, len(rows)) return rows[0][0] def get_running_jobs(self): c = self.get_cursor() c.execute( 'SELECT running_job FROM lorries WHERE running_job IS NOT NULL') return [row[0] for row in c.fetchall()] def set_kill_job(self, job_id, value): logging.debug('StateDB.set_kill_job(%r, %r) called', job_id, value) assert self.in_transaction if value: value = 1 else: value = 0 c = self.get_cursor() c.execute( 'UPDATE jobs SET kill=? WHERE job_id=?', (value, job_id)) def set_lorry_last_run(self, path, last_run): logging.debug( 'StateDB.set_lorry_last_run(%r, %r) called', path, last_run) assert self.in_transaction c = self.get_cursor() c.execute( 'UPDATE lorries SET last_run=? WHERE path=?', (last_run, path)) def set_lorry_last_run_exit_and_output(self, path, exit, output): logging.debug( 'StateDB.set_lorry_last_run_exit_and_output(%r, %r, %r) called', path, exit, output) assert self.in_transaction c = self.get_cursor() c.execute( 'UPDATE lorries SET last_run_exit=?, last_run_error=? WHERE path=?', (exit, output, path)) def set_lorry_disk_usage(self, path, disk_usage): logging.debug( 'StateDB.set_lorry_disk_usage(%r, %r) called', path, disk_usage) assert self.in_transaction c = self.get_cursor() c.execute( 'UPDATE lorries SET disk_usage=? WHERE path=?', (disk_usage, path)) def get_next_job_id(self): logging.debug('StateDB.get_next_job_id called') assert self.in_transaction c = self.get_cursor() c.execute('SELECT job_id FROM next_job_id') row = c.fetchone() job_id = row[0] c.execute('UPDATE next_job_id SET job_id=?', (job_id + 1,)) return job_id def get_job_ids(self): c = self.get_cursor() c.execute('SELECT job_id FROM jobs') return [row[0] for row in c.fetchall()] def get_job_info(self, job_id): c = self.get_cursor() c.execute( 'SELECT job_id, host, pid, started, ended, updated, kill, ' 'path, exit, disk_usage, output FROM jobs WHERE job_id=?', (job_id,)) row = c.fetchone() return { 'job_id': row[0], 'host': row[1], 'pid': row[2], 'started': row[3], 'ended': row[4], 'updated': row[5], 'kill': row[6], 'path': row[7], 'exit': row[8], 'disk_usage': row[9], 'output': row[10], } def get_jobs_for_lorry(self, path): c = self.get_cursor() c.execute('SELECT job_id FROM jobs WHERE path=?', (path,)) return [row[0] for row in c.fetchall()] def get_failed_jobs_for_lorry(self, path): c = self.get_cursor() c.execute( 'SELECT job_id FROM jobs ' 'WHERE path=? AND exit != \'no\' AND exit != 0', (path,)) return [row[0] for row in c.fetchall()] def add_new_job(self, job_id, host, pid, path, started): logging.debug( 'StateDB.add_new_job(%r, %r, %r, %r, %r) called', job_id, host, pid, path, started) assert self.in_transaction c = self.get_cursor() c.execute( 'INSERT INTO jobs (job_id, host, pid, path, started, ' 'updated, kill) ' 'VALUES (?, ?, ?, ?, ?, ?, ?)', (job_id, host, pid, path, started, started, 0)) def get_job_minion_host(self, job_id): c = self.get_cursor() c.execute( 'SELECT host FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0] def get_job_minion_pid(self, job_id): c = self.get_cursor() c.execute( 'SELECT pid FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0] def get_job_path(self, job_id): c = self.get_cursor() c.execute( 'SELECT path FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0] def get_job_started_and_ended(self, job_id): c = self.get_cursor() c.execute( 'SELECT started, ended FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0], row[1] def get_job_updated(self, job_id): c = self.get_cursor() c.execute( 'SELECT updated FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0] def set_job_updated(self, job_id, updated): logging.debug( 'StateDB.set_job_updated(%r, %r) called', job_id, updated) assert self.in_transaction c = self.get_cursor() c.execute( 'UPDATE jobs SET updated=? WHERE job_id IS ?', (updated, job_id)) def get_job_exit(self, job_id): c = self.get_cursor() c.execute( 'SELECT exit FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0] def set_job_exit(self, job_id, exit, ended, disk_usage): logging.debug( 'StateDB.set_job_exit(%r, %r, %r, %r) called', job_id, exit, ended, disk_usage) assert self.in_transaction c = self.get_cursor() c.execute( 'UPDATE jobs SET exit=?, ended=?, disk_usage=? ' 'WHERE job_id IS ?', (exit, ended, disk_usage, job_id)) def get_job_disk_usage(self, job_id): c = self.get_cursor() c.execute('SELECT disk_usage FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0] def get_job_output(self, job_id): c = self.get_cursor() c.execute( 'SELECT output FROM jobs WHERE job_id IS ?', (job_id,)) row = c.fetchone() return row[0] def append_to_job_output(self, job_id, more_output): logging.debug('StateDB.append_to_job_output(%r,..) called', job_id) assert self.in_transaction output = self.get_job_output(job_id) or '' c = self.get_cursor() c.execute( 'UPDATE jobs SET output=? WHERE job_id=?', (output + more_output, job_id)) def get_all_jobs_id_path_exit(self): '''Return id, path, and exit for all jobs. This is an ugly method, but it's much faster than first getting a list of job ids and then querying path and exit for each. Much, much faster. FTL versus the pitch drop experiment faster. This is a generator. ''' c = self.get_cursor() c.execute('SELECT job_id, path, exit FROM jobs') while True: row = c.fetchone() if row is None: break yield row[0], row[1], row[2] def remove_job(self, job_id): logging.debug('StateDB.remove_job(%r,..) called', job_id) assert self.in_transaction c = self.get_cursor() c.execute('DELETE FROM jobs WHERE job_id = ?', (job_id,)) def set_pretend_time(self, now): logging.debug('StateDB.set_pretend_time(%r) called', now) assert self.in_transaction c = self.get_cursor() c.execute('DELETE FROM time') c.execute('INSERT INTO time (now) VALUES (?)', (int(now),)) def get_current_time(self): c = self.get_cursor() c.execute('SELECT now FROM time') row = c.fetchone() if row: return row[0] else: return time.time() def get_max_jobs(self): c = self.get_cursor() c.execute('SELECT max_jobs FROM max_jobs') row = c.fetchone() if row: logging.info('returning max_jobs as %r', row[0]) return row[0] logging.info('returning max_jobs as None') return None def set_max_jobs(self, max_jobs): logging.debug('StateDB.set_max_jobs(%r) called', max_jobs) assert self.in_transaction c = self.get_cursor() c.execute('DELETE FROM max_jobs') if max_jobs is not None: c.execute( 'INSERT INTO max_jobs (max_jobs) VALUES (?)', (max_jobs,))