1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
|
#!/usr/bin/env python3
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Tristan Van Berkom <tristan.vanberkom@codethink.co.uk>
import click
from blessings import Terminal
# Import a widget internal for formatting time codes
from .widget import TimeCode
# Status()
#
# A widget for formatting overall status.
#
# Note that the render() and clear() methods in this class are
# simply noops in the case that the application is not connected
# to a terminal, or if the terminal does not support ANSI escape codes.
#
# Args:
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
# colors (bool): Whether to print the ANSI color codes in the output
#
class Status():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
pipeline, scheduler, colors=False):
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
self._pipeline = pipeline
self._scheduler = scheduler
self._jobs = []
self._last_lines = 0 # Number of status lines we last printed to console
self._term = Terminal()
self._spacing = 1
self._colors = colors
self._header = _StatusHeader(content_profile, format_profile,
success_profile, error_profile,
pipeline, scheduler)
self._term_width, _ = click.get_terminal_size()
self._alloc_lines = 0
self._alloc_columns = None
self._line_length = 0
self._need_alloc = True
# add_job()
#
# Adds a job to track in the status area
#
# Args:
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
def add_job(self, element, action_name):
elapsed = self._scheduler.elapsed_time()
job = _StatusJob(element, action_name, self._content_profile, self._format_profile, elapsed)
self._jobs.append(job)
self._need_alloc = True
# remove_job()
#
# Removes a job currently being tracked in the status area
#
# Args:
# element (Element): The element of the job to track
# action_name (str): The action name for this job
#
def remove_job(self, element, action_name):
self._jobs = [
job for job in self._jobs
if not (job.element is element and
job.action_name == action_name)
]
self._need_alloc = True
# clear()
#
# Clear the status area, it is necessary to call
# this before printing anything to the console if
# a status area is in use.
#
# To print some logging to the output and then restore
# the status, use the following:
#
# status.clear()
# ... print something to console ...
# status.render()
#
def clear(self):
if not self._term.does_styling:
return
for _ in range(self._last_lines):
self._move_up()
self._clear_line()
self._last_lines = 0
# render()
#
# Render the status area.
#
# If you are not printing a line in addition to rendering
# the status area, for instance in a timeout, then it is
# not necessary to call clear().
def render(self):
if not self._term.does_styling:
return
elapsed = self._scheduler.elapsed_time()
self.clear()
self._check_term_width()
self._allocate()
# Nothing to render, early return
if self._alloc_lines == 0:
return
# Before rendering the actual lines, we need to add some line
# feeds for the amount of lines we intend to print first, and
# move cursor position back to the first line
for _ in range(self._alloc_lines + self._header.lines):
click.echo('', err=True)
for _ in range(self._alloc_lines + self._header.lines):
self._move_up()
# Render the one line header
text = self._header.render(self._term_width, elapsed)
click.echo(text, color=self._colors, err=True)
# Now we have the number of columns, and an allocation for
# alignment of each column
n_columns = len(self._alloc_columns)
for line in self._job_lines(n_columns):
text = ''
for job in line:
column = line.index(job)
text += job.render(self._alloc_columns[column] - job.size, elapsed)
# Add spacing between columns
if column < (n_columns - 1):
text += ' ' * self._spacing
# Print the line
click.echo(text, color=self._colors, err=True)
# Track what we printed last, for the next clear
self._last_lines = self._alloc_lines + self._header.lines
###################################################
# Private Methods #
###################################################
def _check_term_width(self):
term_width, _ = click.get_terminal_size()
if self._term_width != term_width:
self._term_width = term_width
self._need_alloc = True
def _move_up(self):
# Explicitly move to beginning of line, fixes things up
# when there was a ^C or ^Z printed to the terminal.
click.echo(self._term.move_x(0) + self._term.move_up, nl=False, err=True)
def _clear_line(self):
click.echo(self._term.clear_eol, nl=False, err=True)
def _allocate(self):
if not self._need_alloc:
return
# State when there is no jobs to display
alloc_lines = 0
alloc_columns = []
line_length = 0
# Test for the widest width which fits columnized jobs
for columns in reversed(range(len(self._jobs))):
alloc_lines, alloc_columns = self._allocate_columns(columns + 1)
# If the sum of column widths with spacing in between
# fits into the terminal width, this is a good allocation.
line_length = sum(alloc_columns) + (columns * self._spacing)
if line_length < self._term_width:
break
self._alloc_lines = alloc_lines
self._alloc_columns = alloc_columns
self._line_length = line_length
self._need_alloc = False
def _job_lines(self, columns):
for i in range(0, len(self._jobs), columns):
yield self._jobs[i:i + columns]
# Returns an array of integers representing the maximum
# length in characters for each column, given the current
# list of jobs to render.
#
def _allocate_columns(self, columns):
column_widths = [0 for _ in range(columns)]
lines = 0
for line in self._job_lines(columns):
line_len = len(line)
lines += 1
for col in range(columns):
if col < line_len:
job = line[col]
column_widths[col] = max(column_widths[col], job.size)
return lines, column_widths
# _StatusHeader()
#
# A delegate object for rendering the header part of the Status() widget
#
# Args:
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# success_profile (Profile): Formatting profile for success text
# error_profile (Profile): Formatting profile for error text
# pipeline (Pipeline): The Pipeline
# scheduler (Scheduler): The Scheduler
#
class _StatusHeader():
def __init__(self, content_profile, format_profile,
success_profile, error_profile,
pipeline, scheduler):
#
# Public members
#
self.lines = 3
#
# Private members
#
self._content_profile = content_profile
self._format_profile = format_profile
self._success_profile = success_profile
self._error_profile = error_profile
self._pipeline = pipeline
self._scheduler = scheduler
self._time_code = TimeCode(content_profile, format_profile)
def render(self, line_length, elapsed):
line_length = max(line_length, 80)
size = 0
text = ''
session = str(self._pipeline.session_elements)
total = str(self._pipeline.total_elements)
# Format and calculate size for pipeline target and overall time code
size += len(total) + len(session) + 4 # Size for (N/N) with a leading space
size += 8 # Size of time code
size += len(self._pipeline.project.name) + 1
text += self._time_code.render_time(elapsed)
text += ' ' + self._content_profile.fmt(self._pipeline.project.name)
text += ' ' + self._format_profile.fmt('(') + \
self._content_profile.fmt(session) + \
self._format_profile.fmt('/') + \
self._content_profile.fmt(total) + \
self._format_profile.fmt(')')
line1 = self._centered(text, size, line_length, '=')
size = 0
text = ''
# Format and calculate size for each queue progress
for queue in self._scheduler.queues:
# Add spacing
if self._scheduler.queues.index(queue) > 0:
size += 2
text += self._format_profile.fmt('→ ')
queue_text, queue_size = self._render_queue(queue)
size += queue_size
text += queue_text
line2 = self._centered(text, size, line_length, ' ')
size = 24
text = self._format_profile.fmt("~~~~~ ") + \
self._content_profile.fmt('Active Tasks') + \
self._format_profile.fmt(" ~~~~~")
line3 = self._centered(text, size, line_length, ' ')
return line1 + '\n' + line2 + '\n' + line3
###################################################
# Private Methods #
###################################################
def _render_queue(self, queue):
processed = str(len(queue.processed_elements))
skipped = str(len(queue.skipped_elements))
failed = str(len(queue.failed_elements))
size = 5 # Space for the formatting '[', ':', ' ', ' ' and ']'
size += len(queue.complete_name)
size += len(processed) + len(skipped) + len(failed)
text = self._format_profile.fmt("(") + \
self._content_profile.fmt(queue.complete_name) + \
self._format_profile.fmt(":") + \
self._success_profile.fmt(processed) + ' ' + \
self._content_profile.fmt(skipped) + ' ' + \
self._error_profile.fmt(failed) + \
self._format_profile.fmt(")")
return (text, size)
def _centered(self, text, size, line_length, fill):
remaining = line_length - size
remaining -= 2
final_text = self._format_profile.fmt(fill * (remaining // 2)) + ' '
final_text += text
final_text += ' ' + self._format_profile.fmt(fill * (remaining // 2))
return final_text
# _StatusJob()
#
# A delegate object for rendering a job in the status area
#
# Args:
# element (Element): The element being processed
# action_name (str): The name of the action
# content_profile (Profile): Formatting profile for content text
# format_profile (Profile): Formatting profile for formatting text
# elapsed (datetime): The offset of the scheduler when this job is created
#
class _StatusJob():
def __init__(self, element, action_name, content_profile, format_profile, elapsed):
#
# Public members
#
self.element = element # The Element
self.action_name = action_name # The action name
self.size = None # The number of characters required to render
#
# Private members
#
self._offset = elapsed
self._content_profile = content_profile
self._format_profile = format_profile
self._time_code = TimeCode(content_profile, format_profile)
# Calculate the size needed to display
self.size = 10 # Size of time code with brackets
self.size += len(action_name)
self.size += len(element._get_full_name())
self.size += 3 # '[' + ':' + ']'
# render()
#
# Render the Job, return a rendered string
#
# Args:
# padding (int): Amount of padding to print in order to align with columns
# elapsed (datetime): The session elapsed time offset
#
def render(self, padding, elapsed):
text = self._format_profile.fmt('[') + \
self._time_code.render_time(elapsed - self._offset) + \
self._format_profile.fmt(']')
# Add padding after the display name, before terminating ']'
name = self.element._get_full_name() + (' ' * padding)
text += self._format_profile.fmt('[') + \
self._content_profile.fmt(self.action_name) + \
self._format_profile.fmt(':') + \
self._content_profile.fmt(name) + \
self._format_profile.fmt(']')
return text
|