# # Copyright (C) 2018 Codethink Limited # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . # # Authors: # Tristan Van Berkom import os import sys import curses import click # Import a widget internal for formatting time codes from .widget import TimeCode from .._scheduler import ElementJob # Status() # # A widget for formatting overall status. # # Note that the render() and clear() methods in this class are # simply noops in the case that the application is not connected # to a terminal, or if the terminal does not support ANSI escape codes. # # Args: # context (Context): The Context # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text # stream (Stream): The Stream # colors (bool): Whether to print the ANSI color codes in the output # class Status(): # Table of the terminal capabilities we require and use _TERM_CAPABILITIES = { 'move_up': 'cuu1', 'move_x': 'hpa', 'clear_eol': 'el' } def __init__(self, context, content_profile, format_profile, success_profile, error_profile, stream, colors=False): self._context = context self._content_profile = content_profile self._format_profile = format_profile self._success_profile = success_profile self._error_profile = error_profile self._stream = stream self._jobs = [] self._last_lines = 0 # Number of status lines we last printed to console self._spacing = 1 self._colors = colors self._header = _StatusHeader(context, content_profile, format_profile, success_profile, error_profile, stream) self._term_width, _ = click.get_terminal_size() self._alloc_lines = 0 self._alloc_columns = None self._line_length = 0 self._need_alloc = True self._term_caps = self._init_terminal() # add_job() # # Adds a job to track in the status area # # Args: # element (Element): The element of the job to track # action_name (str): The action name for this job # def add_job(self, job): elapsed = self._stream.elapsed_time job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed) self._jobs.append(job) self._need_alloc = True # remove_job() # # Removes a job currently being tracked in the status area # # Args: # element (Element): The element of the job to track # action_name (str): The action name for this job # def remove_job(self, job): action_name = job.action_name if not isinstance(job, ElementJob): element = None else: element = job.element self._jobs = [ job for job in self._jobs if not (job.element is element and job.action_name == action_name) ] self._need_alloc = True # clear() # # Clear the status area, it is necessary to call # this before printing anything to the console if # a status area is in use. # # To print some logging to the output and then restore # the status, use the following: # # status.clear() # ... print something to console ... # status.render() # def clear(self): if not self._term_caps: return for _ in range(self._last_lines): self._move_up() self._clear_line() self._last_lines = 0 # render() # # Render the status area. # # If you are not printing a line in addition to rendering # the status area, for instance in a timeout, then it is # not necessary to call clear(). def render(self): if not self._term_caps: return elapsed = self._stream.elapsed_time self.clear() self._check_term_width() self._allocate() # Nothing to render, early return if self._alloc_lines == 0: return # Before rendering the actual lines, we need to add some line # feeds for the amount of lines we intend to print first, and # move cursor position back to the first line for _ in range(self._alloc_lines + self._header.lines): click.echo('', err=True) for _ in range(self._alloc_lines + self._header.lines): self._move_up() # Render the one line header text = self._header.render(self._term_width, elapsed) click.echo(text, color=self._colors, err=True) # Now we have the number of columns, and an allocation for # alignment of each column n_columns = len(self._alloc_columns) for line in self._job_lines(n_columns): text = '' for job in line: column = line.index(job) text += job.render(self._alloc_columns[column] - job.size, elapsed) # Add spacing between columns if column < (n_columns - 1): text += ' ' * self._spacing # Print the line click.echo(text, color=self._colors, err=True) # Track what we printed last, for the next clear self._last_lines = self._alloc_lines + self._header.lines ################################################### # Private Methods # ################################################### # _init_terminal() # # Initialize the terminal and return the resolved terminal # capabilities dictionary. # # Returns: # (dict|None): The resolved terminal capabilities dictionary, # or None if the terminal does not support all # of the required capabilities. # def _init_terminal(self): # We need both output streams to be connected to a terminal if not (sys.stdout.isatty() and sys.stderr.isatty()): return None # Initialized terminal, curses might decide it doesnt # support this terminal try: curses.setupterm(os.environ.get('TERM', 'dumb')) except curses.error: return None term_caps = {} # Resolve the string capabilities we need for the capability # names we need. # for capname, capval in self._TERM_CAPABILITIES.items(): code = curses.tigetstr(capval) # If any of the required capabilities resolve empty strings or None, # then we don't have the capabilities we need for a status bar on # this terminal. if not code: return None # Decode sequences as latin1, as they are always 8-bit bytes, # so when b'\xff' is returned, this must be decoded to u'\xff'. # # This technique is employed by the python blessings library # as well, and should provide better compatibility with most # terminals. # term_caps[capname] = code.decode('latin1') return term_caps def _check_term_width(self): term_width, _ = click.get_terminal_size() if self._term_width != term_width: self._term_width = term_width self._need_alloc = True def _move_up(self): assert self._term_caps is not None # Explicitly move to beginning of line, fixes things up # when there was a ^C or ^Z printed to the terminal. move_x = curses.tparm(self._term_caps['move_x'].encode('latin1'), 0) move_x = move_x.decode('latin1') move_up = curses.tparm(self._term_caps['move_up'].encode('latin1')) move_up = move_up.decode('latin1') click.echo(move_x + move_up, nl=False, err=True) def _clear_line(self): assert self._term_caps is not None clear_eol = curses.tparm(self._term_caps['clear_eol'].encode('latin1')) clear_eol = clear_eol.decode('latin1') click.echo(clear_eol, nl=False, err=True) def _allocate(self): if not self._need_alloc: return # State when there is no jobs to display alloc_lines = 0 alloc_columns = [] line_length = 0 # Test for the widest width which fits columnized jobs for columns in reversed(range(len(self._jobs))): alloc_lines, alloc_columns = self._allocate_columns(columns + 1) # If the sum of column widths with spacing in between # fits into the terminal width, this is a good allocation. line_length = sum(alloc_columns) + (columns * self._spacing) if line_length < self._term_width: break self._alloc_lines = alloc_lines self._alloc_columns = alloc_columns self._line_length = line_length self._need_alloc = False def _job_lines(self, columns): for i in range(0, len(self._jobs), columns): yield self._jobs[i:i + columns] # Returns an array of integers representing the maximum # length in characters for each column, given the current # list of jobs to render. # def _allocate_columns(self, columns): column_widths = [0 for _ in range(columns)] lines = 0 for line in self._job_lines(columns): line_len = len(line) lines += 1 for col in range(columns): if col < line_len: job = line[col] column_widths[col] = max(column_widths[col], job.size) return lines, column_widths # _StatusHeader() # # A delegate object for rendering the header part of the Status() widget # # Args: # context (Context): The Context # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # success_profile (Profile): Formatting profile for success text # error_profile (Profile): Formatting profile for error text # stream (Stream): The Stream # class _StatusHeader(): def __init__(self, context, content_profile, format_profile, success_profile, error_profile, stream): # # Public members # self.lines = 3 # # Private members # self._content_profile = content_profile self._format_profile = format_profile self._success_profile = success_profile self._error_profile = error_profile self._stream = stream self._time_code = TimeCode(context, content_profile, format_profile) self._context = context def render(self, line_length, elapsed): project = self._context.get_toplevel_project() line_length = max(line_length, 80) # # Line 1: Session time, project name, session / total elements # # ========= 00:00:00 project-name (143/387) ========= # session = str(len(self._stream.session_elements)) total = str(len(self._stream.total_elements)) size = 0 text = '' size += len(total) + len(session) + 4 # Size for (N/N) with a leading space size += 8 # Size of time code size += len(project.name) + 1 text += self._time_code.render_time(elapsed) text += ' ' + self._content_profile.fmt(project.name) text += ' ' + self._format_profile.fmt('(') + \ self._content_profile.fmt(session) + \ self._format_profile.fmt('/') + \ self._content_profile.fmt(total) + \ self._format_profile.fmt(')') line1 = self._centered(text, size, line_length, '=') # # Line 2: Dynamic list of queue status reports # # (Fetched:0 117 0)→ (Built:4 0 0) # size = 0 text = '' # Format and calculate size for each queue progress for queue in self._stream.queues: # Add spacing if self._stream.queues.index(queue) > 0: size += 2 text += self._format_profile.fmt('→ ') queue_text, queue_size = self._render_queue(queue) size += queue_size text += queue_text line2 = self._centered(text, size, line_length, ' ') # # Line 3: Cache usage percentage report # # ~~~~~~ cache: 69% ~~~~~~ # usage = self._context.get_artifact_cache_usage() usage_percent = '{}%'.format(usage.used_percent) size = 21 size += len(usage_percent) if usage.used_percent >= 95: formatted_usage_percent = self._error_profile.fmt(usage_percent) elif usage.used_percent >= 80: formatted_usage_percent = self._content_profile.fmt(usage_percent) else: formatted_usage_percent = self._success_profile.fmt(usage_percent) text = self._format_profile.fmt("~~~~~~ ") + \ self._content_profile.fmt('cache') + \ self._format_profile.fmt(': ') + \ formatted_usage_percent + \ self._format_profile.fmt(' ~~~~~~') line3 = self._centered(text, size, line_length, ' ') return line1 + '\n' + line2 + '\n' + line3 ################################################### # Private Methods # ################################################### def _render_queue(self, queue): processed = str(len(queue.processed_elements)) skipped = str(len(queue.skipped_elements)) failed = str(len(queue.failed_elements)) size = 5 # Space for the formatting '[', ':', ' ', ' ' and ']' size += len(queue.complete_name) size += len(processed) + len(skipped) + len(failed) text = self._format_profile.fmt("(") + \ self._content_profile.fmt(queue.complete_name) + \ self._format_profile.fmt(":") + \ self._success_profile.fmt(processed) + ' ' + \ self._content_profile.fmt(skipped) + ' ' + \ self._error_profile.fmt(failed) + \ self._format_profile.fmt(")") return (text, size) def _centered(self, text, size, line_length, fill): remaining = line_length - size remaining -= 2 final_text = self._format_profile.fmt(fill * (remaining // 2)) + ' ' final_text += text final_text += ' ' + self._format_profile.fmt(fill * (remaining // 2)) return final_text # _StatusJob() # # A delegate object for rendering a job in the status area # # Args: # context (Context): The Context # job (Job): The job being processed # content_profile (Profile): Formatting profile for content text # format_profile (Profile): Formatting profile for formatting text # elapsed (datetime): The offset into the session when this job is created # class _StatusJob(): def __init__(self, context, job, content_profile, format_profile, elapsed): action_name = job.action_name if not isinstance(job, ElementJob): element = None else: element = job.element # # Public members # self.element = element # The Element self.action_name = action_name # The action name self.size = None # The number of characters required to render self.full_name = element._get_full_name() if element else action_name # # Private members # self._offset = elapsed self._content_profile = content_profile self._format_profile = format_profile self._time_code = TimeCode(context, content_profile, format_profile) # Calculate the size needed to display self.size = 10 # Size of time code with brackets self.size += len(action_name) self.size += len(self.full_name) self.size += 3 # '[' + ':' + ']' # render() # # Render the Job, return a rendered string # # Args: # padding (int): Amount of padding to print in order to align with columns # elapsed (datetime): The session elapsed time offset # def render(self, padding, elapsed): text = self._format_profile.fmt('[') + \ self._time_code.render_time(elapsed - self._offset) + \ self._format_profile.fmt(']') # Add padding after the display name, before terminating ']' name = self.full_name + (' ' * padding) text += self._format_profile.fmt('[') + \ self._content_profile.fmt(self.action_name) + \ self._format_profile.fmt(':') + \ self._content_profile.fmt(name) + \ self._format_profile.fmt(']') return text