123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- from __future__ import absolute_import, division
- import contextlib
- import itertools
- import logging
- import sys
- import time
- from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
- from pip._internal.utils.compat import WINDOWS
- from pip._internal.utils.logging import get_indentation
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- if MYPY_CHECK_RUNNING:
- from typing import Iterator, IO
- logger = logging.getLogger(__name__)
- class SpinnerInterface(object):
- def spin(self):
- # type: () -> None
- raise NotImplementedError()
- def finish(self, final_status):
- # type: (str) -> None
- raise NotImplementedError()
- class InteractiveSpinner(SpinnerInterface):
- def __init__(self, message, file=None, spin_chars="-\\|/",
- # Empirically, 8 updates/second looks nice
- min_update_interval_seconds=0.125):
- # type: (str, IO[str], str, float) -> None
- self._message = message
- if file is None:
- file = sys.stdout
- self._file = file
- self._rate_limiter = RateLimiter(min_update_interval_seconds)
- self._finished = False
- self._spin_cycle = itertools.cycle(spin_chars)
- self._file.write(" " * get_indentation() + self._message + " ... ")
- self._width = 0
- def _write(self, status):
- # type: (str) -> None
- assert not self._finished
- # Erase what we wrote before by backspacing to the beginning, writing
- # spaces to overwrite the old text, and then backspacing again
- backup = "\b" * self._width
- self._file.write(backup + " " * self._width + backup)
- # Now we have a blank slate to add our status
- self._file.write(status)
- self._width = len(status)
- self._file.flush()
- self._rate_limiter.reset()
- def spin(self):
- # type: () -> None
- if self._finished:
- return
- if not self._rate_limiter.ready():
- return
- self._write(next(self._spin_cycle))
- def finish(self, final_status):
- # type: (str) -> None
- if self._finished:
- return
- self._write(final_status)
- self._file.write("\n")
- self._file.flush()
- self._finished = True
- # Used for dumb terminals, non-interactive installs (no tty), etc.
- # We still print updates occasionally (once every 60 seconds by default) to
- # act as a keep-alive for systems like Travis-CI that take lack-of-output as
- # an indication that a task has frozen.
- class NonInteractiveSpinner(SpinnerInterface):
- def __init__(self, message, min_update_interval_seconds=60):
- # type: (str, float) -> None
- self._message = message
- self._finished = False
- self._rate_limiter = RateLimiter(min_update_interval_seconds)
- self._update("started")
- def _update(self, status):
- # type: (str) -> None
- assert not self._finished
- self._rate_limiter.reset()
- logger.info("%s: %s", self._message, status)
- def spin(self):
- # type: () -> None
- if self._finished:
- return
- if not self._rate_limiter.ready():
- return
- self._update("still running...")
- def finish(self, final_status):
- # type: (str) -> None
- if self._finished:
- return
- self._update(
- "finished with status '{final_status}'".format(**locals()))
- self._finished = True
- class RateLimiter(object):
- def __init__(self, min_update_interval_seconds):
- # type: (float) -> None
- self._min_update_interval_seconds = min_update_interval_seconds
- self._last_update = 0 # type: float
- def ready(self):
- # type: () -> bool
- now = time.time()
- delta = now - self._last_update
- return delta >= self._min_update_interval_seconds
- def reset(self):
- # type: () -> None
- self._last_update = time.time()
- @contextlib.contextmanager
- def open_spinner(message):
- # type: (str) -> Iterator[SpinnerInterface]
- # Interactive spinner goes directly to sys.stdout rather than being routed
- # through the logging system, but it acts like it has level INFO,
- # i.e. it's only displayed if we're at level INFO or better.
- # Non-interactive spinner goes through the logging system, so it is always
- # in sync with logging configuration.
- if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
- spinner = InteractiveSpinner(message) # type: SpinnerInterface
- else:
- spinner = NonInteractiveSpinner(message)
- try:
- with hidden_cursor(sys.stdout):
- yield spinner
- except KeyboardInterrupt:
- spinner.finish("canceled")
- raise
- except Exception:
- spinner.finish("error")
- raise
- else:
- spinner.finish("done")
- @contextlib.contextmanager
- def hidden_cursor(file):
- # type: (IO[str]) -> Iterator[None]
- # The Windows terminal does not support the hide/show cursor ANSI codes,
- # even via colorama. So don't even try.
- if WINDOWS:
- yield
- # We don't want to clutter the output with control characters if we're
- # writing to a file, or if the user is running with --quiet.
- # See https://github.com/pypa/pip/issues/3418
- elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
- yield
- else:
- file.write(HIDE_CURSOR)
- try:
- yield
- finally:
- file.write(SHOW_CURSOR)
|