summaryrefslogtreecommitdiff
path: root/src/pip/_internal/cli/progress_bars.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pip/_internal/cli/progress_bars.py')
-rw-r--r--src/pip/_internal/cli/progress_bars.py298
1 files changed, 58 insertions, 240 deletions
diff --git a/src/pip/_internal/cli/progress_bars.py b/src/pip/_internal/cli/progress_bars.py
index f3db29519..0ad14031c 100644
--- a/src/pip/_internal/cli/progress_bars.py
+++ b/src/pip/_internal/cli/progress_bars.py
@@ -1,250 +1,68 @@
-import itertools
-import sys
-from signal import SIGINT, default_int_handler, signal
-from typing import Any
+import functools
+from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple
+
+from pip._vendor.rich.progress import (
+ BarColumn,
+ DownloadColumn,
+ FileSizeColumn,
+ Progress,
+ ProgressColumn,
+ SpinnerColumn,
+ TextColumn,
+ TimeElapsedColumn,
+ TimeRemainingColumn,
+ TransferSpeedColumn,
+)
-from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
-from pip._vendor.progress.spinner import Spinner
-
-from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.logging import get_indentation
-from pip._internal.utils.misc import format_size
-
-try:
- from pip._vendor import colorama
-# Lots of different errors can come from this, including SystemError and
-# ImportError.
-except Exception:
- colorama = None
-
-
-def _select_progress_class(preferred: Bar, fallback: Bar) -> Bar:
- encoding = getattr(preferred.file, "encoding", None)
- # If we don't know what encoding this file is in, then we'll just assume
- # that it doesn't support unicode and use the ASCII bar.
- if not encoding:
- return fallback
-
- # Collect all of the possible characters we want to use with the preferred
- # bar.
- characters = [
- getattr(preferred, "empty_fill", ""),
- getattr(preferred, "fill", ""),
- ]
- characters += list(getattr(preferred, "phases", []))
-
- # Try to decode the characters we're using for the bar using the encoding
- # of the given file, if this works then we'll assume that we can use the
- # fancier bar and if not we'll fall back to the plaintext bar.
- try:
- "".join(characters).encode(encoding)
- except UnicodeEncodeError:
- return fallback
+DownloadProgressRenderer = Callable[[Iterable[bytes]], Iterator[bytes]]
+
+
+def _rich_progress_bar(
+ iterable: Iterable[bytes],
+ *,
+ bar_type: str,
+ size: int,
+) -> Generator[bytes, None, None]:
+ assert bar_type == "on", "This should only be used in the default mode."
+
+ if not size:
+ total = float("inf")
+ columns: Tuple[ProgressColumn, ...] = (
+ TextColumn("[progress.description]{task.description}"),
+ SpinnerColumn("line", speed=1.5),
+ FileSizeColumn(),
+ TransferSpeedColumn(),
+ TimeElapsedColumn(),
+ )
else:
- return preferred
-
-
-_BaseBar: Any = _select_progress_class(IncrementalBar, Bar)
-
-
-class InterruptibleMixin:
- """
- Helper to ensure that self.finish() gets called on keyboard interrupt.
-
- This allows downloads to be interrupted without leaving temporary state
- (like hidden cursors) behind.
-
- This class is similar to the progress library's existing SigIntMixin
- helper, but as of version 1.2, that helper has the following problems:
-
- 1. It calls sys.exit().
- 2. It discards the existing SIGINT handler completely.
- 3. It leaves its own handler in place even after an uninterrupted finish,
- which will have unexpected delayed effects if the user triggers an
- unrelated keyboard interrupt some time after a progress-displaying
- download has already completed, for example.
- """
-
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- """
- Save the original SIGINT handler for later.
- """
- # https://github.com/python/mypy/issues/5887
- super().__init__(*args, **kwargs) # type: ignore
-
- self.original_handler = signal(SIGINT, self.handle_sigint)
-
- # If signal() returns None, the previous handler was not installed from
- # Python, and we cannot restore it. This probably should not happen,
- # but if it does, we must restore something sensible instead, at least.
- # The least bad option should be Python's default SIGINT handler, which
- # just raises KeyboardInterrupt.
- if self.original_handler is None:
- self.original_handler = default_int_handler
-
- def finish(self) -> None:
- """
- Restore the original SIGINT handler after finishing.
-
- This should happen regardless of whether the progress display finishes
- normally, or gets interrupted.
- """
- super().finish() # type: ignore
- signal(SIGINT, self.original_handler)
-
- def handle_sigint(self, signum, frame): # type: ignore
- """
- Call self.finish() before delegating to the original SIGINT handler.
-
- This handler should only be in place while the progress display is
- active.
- """
- self.finish()
- self.original_handler(signum, frame)
-
-
-class SilentBar(Bar):
- def update(self) -> None:
- pass
-
-
-class BlueEmojiBar(IncrementalBar):
-
- suffix = "%(percent)d%%"
- bar_prefix = " "
- bar_suffix = " "
- phases = ("\U0001F539", "\U0001F537", "\U0001F535")
-
-
-class DownloadProgressMixin:
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- # https://github.com/python/mypy/issues/5887
- super().__init__(*args, **kwargs) # type: ignore
- self.message: str = (" " * (get_indentation() + 2)) + self.message
-
- @property
- def downloaded(self) -> str:
- return format_size(self.index) # type: ignore
-
- @property
- def download_speed(self) -> str:
- # Avoid zero division errors...
- if self.avg == 0.0: # type: ignore
- return "..."
- return format_size(1 / self.avg) + "/s" # type: ignore
-
- @property
- def pretty_eta(self) -> str:
- if self.eta: # type: ignore
- return f"eta {self.eta_td}" # type: ignore
- return ""
-
- def iter(self, it): # type: ignore
- for x in it:
- yield x
- # B305 is incorrectly raised here
- # https://github.com/PyCQA/flake8-bugbear/issues/59
- self.next(len(x)) # noqa: B305
- self.finish()
-
-
-class WindowsMixin:
- def __init__(self, *args: Any, **kwargs: Any) -> None:
- # The Windows terminal does not support the hide/show cursor ANSI codes
- # even with colorama. So we'll ensure that hide_cursor is False on
- # Windows.
- # This call needs to go before the super() call, so that hide_cursor
- # is set in time. The base progress bar class writes the "hide cursor"
- # code to the terminal in its init, so if we don't set this soon
- # enough, we get a "hide" with no corresponding "show"...
- if WINDOWS and self.hide_cursor: # type: ignore
- self.hide_cursor = False
-
- # https://github.com/python/mypy/issues/5887
- super().__init__(*args, **kwargs) # type: ignore
-
- # Check if we are running on Windows and we have the colorama module,
- # if we do then wrap our file with it.
- if WINDOWS and colorama:
- self.file = colorama.AnsiToWin32(self.file) # type: ignore
- # The progress code expects to be able to call self.file.isatty()
- # but the colorama.AnsiToWin32() object doesn't have that, so we'll
- # add it.
- self.file.isatty = lambda: self.file.wrapped.isatty()
- # The progress code expects to be able to call self.file.flush()
- # but the colorama.AnsiToWin32() object doesn't have that, so we'll
- # add it.
- self.file.flush = lambda: self.file.wrapped.flush()
-
-
-class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin, DownloadProgressMixin):
-
- file = sys.stdout
- message = "%(percent)d%%"
- suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
-
-
-class DefaultDownloadProgressBar(BaseDownloadProgressBar, _BaseBar):
- pass
-
-
-class DownloadSilentBar(BaseDownloadProgressBar, SilentBar):
- pass
-
-
-class DownloadBar(BaseDownloadProgressBar, Bar):
- pass
-
-
-class DownloadFillingCirclesBar(BaseDownloadProgressBar, FillingCirclesBar):
- pass
-
-
-class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, BlueEmojiBar):
- pass
-
-
-class DownloadProgressSpinner(
- WindowsMixin, InterruptibleMixin, DownloadProgressMixin, Spinner
-):
-
- file = sys.stdout
- suffix = "%(downloaded)s %(download_speed)s"
-
- def next_phase(self) -> str:
- if not hasattr(self, "_phaser"):
- self._phaser = itertools.cycle(self.phases)
- return next(self._phaser)
-
- def update(self) -> None:
- message = self.message % self
- phase = self.next_phase()
- suffix = self.suffix % self
- line = "".join(
- [
- message,
- " " if message else "",
- phase,
- " " if suffix else "",
- suffix,
- ]
+ total = size
+ columns = (
+ TextColumn("[progress.description]{task.description}"),
+ BarColumn(),
+ DownloadColumn(),
+ TransferSpeedColumn(),
+ TextColumn("eta"),
+ TimeRemainingColumn(),
)
- self.writeln(line)
-
+ progress = Progress(*columns, refresh_per_second=30)
+ task_id = progress.add_task(" " * (get_indentation() + 2), total=total)
+ with progress:
+ for chunk in iterable:
+ yield chunk
+ progress.update(task_id, advance=len(chunk))
-BAR_TYPES = {
- "off": (DownloadSilentBar, DownloadSilentBar),
- "on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
- "ascii": (DownloadBar, DownloadProgressSpinner),
- "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
- "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner),
-}
+def get_download_progress_renderer(
+ *, bar_type: str, size: Optional[int] = None
+) -> DownloadProgressRenderer:
+ """Get an object that can be used to render the download progress.
-def DownloadProgressProvider(progress_bar, max=None): # type: ignore
- if max is None or max == 0:
- return BAR_TYPES[progress_bar][1]().iter
+ Returns a callable, that takes an iterable to "wrap".
+ """
+ if bar_type == "on":
+ return functools.partial(_rich_progress_bar, bar_type=bar_type, size=size)
else:
- return BAR_TYPES[progress_bar][0](max=max).iter
+ return iter # no-op, when passed an iterator