123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- from __future__ import absolute_import
- import logging
- import os
- import subprocess
- from pip._vendor.six.moves import shlex_quote
- from pip._internal.cli.spinners import SpinnerInterface, open_spinner
- from pip._internal.exceptions import InstallationError
- from pip._internal.utils.compat import console_to_str, str_to_display
- from pip._internal.utils.logging import subprocess_logger
- from pip._internal.utils.misc import HiddenText, path_to_display
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- if MYPY_CHECK_RUNNING:
- from typing import (
- Any, Callable, Iterable, List, Mapping, Optional, Text, Union,
- )
- CommandArgs = List[Union[str, HiddenText]]
- LOG_DIVIDER = '----------------------------------------'
- def make_command(*args):
- # type: (Union[str, HiddenText, CommandArgs]) -> CommandArgs
- """
- Create a CommandArgs object.
- """
- command_args = [] # type: CommandArgs
- for arg in args:
- # Check for list instead of CommandArgs since CommandArgs is
- # only known during type-checking.
- if isinstance(arg, list):
- command_args.extend(arg)
- else:
- # Otherwise, arg is str or HiddenText.
- command_args.append(arg)
- return command_args
- def format_command_args(args):
- # type: (Union[List[str], CommandArgs]) -> str
- """
- Format command arguments for display.
- """
- # For HiddenText arguments, display the redacted form by calling str().
- # Also, we don't apply str() to arguments that aren't HiddenText since
- # this can trigger a UnicodeDecodeError in Python 2 if the argument
- # has type unicode and includes a non-ascii character. (The type
- # checker doesn't ensure the annotations are correct in all cases.)
- return ' '.join(
- shlex_quote(str(arg)) if isinstance(arg, HiddenText)
- else shlex_quote(arg) for arg in args
- )
- def reveal_command_args(args):
- # type: (Union[List[str], CommandArgs]) -> List[str]
- """
- Return the arguments in their raw, unredacted form.
- """
- return [
- arg.secret if isinstance(arg, HiddenText) else arg for arg in args
- ]
- def make_subprocess_output_error(
- cmd_args, # type: Union[List[str], CommandArgs]
- cwd, # type: Optional[str]
- lines, # type: List[Text]
- exit_status, # type: int
- ):
- # type: (...) -> Text
- """
- Create and return the error message to use to log a subprocess error
- with command output.
- :param lines: A list of lines, each ending with a newline.
- """
- command = format_command_args(cmd_args)
- # Convert `command` and `cwd` to text (unicode in Python 2) so we can use
- # them as arguments in the unicode format string below. This avoids
- # "UnicodeDecodeError: 'ascii' codec can't decode byte ..." in Python 2
- # if either contains a non-ascii character.
- command_display = str_to_display(command, desc='command bytes')
- cwd_display = path_to_display(cwd)
- # We know the joined output value ends in a newline.
- output = ''.join(lines)
- msg = (
- # Use a unicode string to avoid "UnicodeEncodeError: 'ascii'
- # codec can't encode character ..." in Python 2 when a format
- # argument (e.g. `output`) has a non-ascii character.
- u'Command errored out with exit status {exit_status}:\n'
- ' command: {command_display}\n'
- ' cwd: {cwd_display}\n'
- 'Complete output ({line_count} lines):\n{output}{divider}'
- ).format(
- exit_status=exit_status,
- command_display=command_display,
- cwd_display=cwd_display,
- line_count=len(lines),
- output=output,
- divider=LOG_DIVIDER,
- )
- return msg
- def call_subprocess(
- cmd, # type: Union[List[str], CommandArgs]
- show_stdout=False, # type: bool
- cwd=None, # type: Optional[str]
- on_returncode='raise', # type: str
- extra_ok_returncodes=None, # type: Optional[Iterable[int]]
- command_desc=None, # type: Optional[str]
- extra_environ=None, # type: Optional[Mapping[str, Any]]
- unset_environ=None, # type: Optional[Iterable[str]]
- spinner=None, # type: Optional[SpinnerInterface]
- log_failed_cmd=True # type: Optional[bool]
- ):
- # type: (...) -> Text
- """
- Args:
- show_stdout: if true, use INFO to log the subprocess's stderr and
- stdout streams. Otherwise, use DEBUG. Defaults to False.
- extra_ok_returncodes: an iterable of integer return codes that are
- acceptable, in addition to 0. Defaults to None, which means [].
- unset_environ: an iterable of environment variable names to unset
- prior to calling subprocess.Popen().
- log_failed_cmd: if false, failed commands are not logged, only raised.
- """
- if extra_ok_returncodes is None:
- extra_ok_returncodes = []
- if unset_environ is None:
- unset_environ = []
- # Most places in pip use show_stdout=False. What this means is--
- #
- # - We connect the child's output (combined stderr and stdout) to a
- # single pipe, which we read.
- # - We log this output to stderr at DEBUG level as it is received.
- # - If DEBUG logging isn't enabled (e.g. if --verbose logging wasn't
- # requested), then we show a spinner so the user can still see the
- # subprocess is in progress.
- # - If the subprocess exits with an error, we log the output to stderr
- # at ERROR level if it hasn't already been displayed to the console
- # (e.g. if --verbose logging wasn't enabled). This way we don't log
- # the output to the console twice.
- #
- # If show_stdout=True, then the above is still done, but with DEBUG
- # replaced by INFO.
- if show_stdout:
- # Then log the subprocess output at INFO level.
- log_subprocess = subprocess_logger.info
- used_level = logging.INFO
- else:
- # Then log the subprocess output using DEBUG. This also ensures
- # it will be logged to the log file (aka user_log), if enabled.
- log_subprocess = subprocess_logger.debug
- used_level = logging.DEBUG
- # Whether the subprocess will be visible in the console.
- showing_subprocess = subprocess_logger.getEffectiveLevel() <= used_level
- # Only use the spinner if we're not showing the subprocess output
- # and we have a spinner.
- use_spinner = not showing_subprocess and spinner is not None
- if command_desc is None:
- command_desc = format_command_args(cmd)
- log_subprocess("Running command %s", command_desc)
- env = os.environ.copy()
- if extra_environ:
- env.update(extra_environ)
- for name in unset_environ:
- env.pop(name, None)
- try:
- proc = subprocess.Popen(
- # Convert HiddenText objects to the underlying str.
- reveal_command_args(cmd),
- stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, cwd=cwd, env=env,
- )
- assert proc.stdin
- assert proc.stdout
- proc.stdin.close()
- except Exception as exc:
- if log_failed_cmd:
- subprocess_logger.critical(
- "Error %s while executing command %s", exc, command_desc,
- )
- raise
- all_output = []
- while True:
- # The "line" value is a unicode string in Python 2.
- line = console_to_str(proc.stdout.readline())
- if not line:
- break
- line = line.rstrip()
- all_output.append(line + '\n')
- # Show the line immediately.
- log_subprocess(line)
- # Update the spinner.
- if use_spinner:
- assert spinner
- spinner.spin()
- try:
- proc.wait()
- finally:
- if proc.stdout:
- proc.stdout.close()
- proc_had_error = (
- proc.returncode and proc.returncode not in extra_ok_returncodes
- )
- if use_spinner:
- assert spinner
- if proc_had_error:
- spinner.finish("error")
- else:
- spinner.finish("done")
- if proc_had_error:
- if on_returncode == 'raise':
- if not showing_subprocess and log_failed_cmd:
- # Then the subprocess streams haven't been logged to the
- # console yet.
- msg = make_subprocess_output_error(
- cmd_args=cmd,
- cwd=cwd,
- lines=all_output,
- exit_status=proc.returncode,
- )
- subprocess_logger.error(msg)
- exc_msg = (
- 'Command errored out with exit status {}: {} '
- 'Check the logs for full command output.'
- ).format(proc.returncode, command_desc)
- raise InstallationError(exc_msg)
- elif on_returncode == 'warn':
- subprocess_logger.warning(
- 'Command "%s" had error code %s in %s',
- command_desc,
- proc.returncode,
- cwd,
- )
- elif on_returncode == 'ignore':
- pass
- else:
- raise ValueError('Invalid value: on_returncode={!r}'.format(
- on_returncode))
- return ''.join(all_output)
- def runner_with_spinner_message(message):
- # type: (str) -> Callable[..., None]
- """Provide a subprocess_runner that shows a spinner message.
- Intended for use with for pep517's Pep517HookCaller. Thus, the runner has
- an API that matches what's expected by Pep517HookCaller.subprocess_runner.
- """
- def runner(
- cmd, # type: List[str]
- cwd=None, # type: Optional[str]
- extra_environ=None # type: Optional[Mapping[str, Any]]
- ):
- # type: (...) -> None
- with open_spinner(message) as spinner:
- call_subprocess(
- cmd,
- cwd=cwd,
- extra_environ=extra_environ,
- spinner=spinner,
- )
- return runner
|