123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- import contextlib
- import os
- import shlex
- import shutil
- import sys
- import tempfile
- from . import formatting
- from . import termui
- from . import utils
- from ._compat import iteritems
- from ._compat import PY2
- from ._compat import string_types
- if PY2:
- from cStringIO import StringIO
- else:
- import io
- from ._compat import _find_binary_reader
- class EchoingStdin(object):
- def __init__(self, input, output):
- self._input = input
- self._output = output
- def __getattr__(self, x):
- return getattr(self._input, x)
- def _echo(self, rv):
- self._output.write(rv)
- return rv
- def read(self, n=-1):
- return self._echo(self._input.read(n))
- def readline(self, n=-1):
- return self._echo(self._input.readline(n))
- def readlines(self):
- return [self._echo(x) for x in self._input.readlines()]
- def __iter__(self):
- return iter(self._echo(x) for x in self._input)
- def __repr__(self):
- return repr(self._input)
- def make_input_stream(input, charset):
- # Is already an input stream.
- if hasattr(input, "read"):
- if PY2:
- return input
- rv = _find_binary_reader(input)
- if rv is not None:
- return rv
- raise TypeError("Could not find binary reader for input stream.")
- if input is None:
- input = b""
- elif not isinstance(input, bytes):
- input = input.encode(charset)
- if PY2:
- return StringIO(input)
- return io.BytesIO(input)
- class Result(object):
- """Holds the captured result of an invoked CLI script."""
- def __init__(
- self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None
- ):
- #: The runner that created the result
- self.runner = runner
- #: The standard output as bytes.
- self.stdout_bytes = stdout_bytes
- #: The standard error as bytes, or None if not available
- self.stderr_bytes = stderr_bytes
- #: The exit code as integer.
- self.exit_code = exit_code
- #: The exception that happened if one did.
- self.exception = exception
- #: The traceback
- self.exc_info = exc_info
- @property
- def output(self):
- """The (standard) output as unicode string."""
- return self.stdout
- @property
- def stdout(self):
- """The standard output as unicode string."""
- return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
- "\r\n", "\n"
- )
- @property
- def stderr(self):
- """The standard error as unicode string."""
- if self.stderr_bytes is None:
- raise ValueError("stderr not separately captured")
- return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
- "\r\n", "\n"
- )
- def __repr__(self):
- return "<{} {}>".format(
- type(self).__name__, repr(self.exception) if self.exception else "okay"
- )
- class CliRunner(object):
- """The CLI runner provides functionality to invoke a Click command line
- script for unittesting purposes in a isolated environment. This only
- works in single-threaded systems without any concurrency as it changes the
- global interpreter state.
- :param charset: the character set for the input and output data. This is
- UTF-8 by default and should not be changed currently as
- the reporting to Click only works in Python 2 properly.
- :param env: a dictionary with environment variables for overriding.
- :param echo_stdin: if this is set to `True`, then reading from stdin writes
- to stdout. This is useful for showing examples in
- some circumstances. Note that regular prompts
- will automatically echo the input.
- :param mix_stderr: if this is set to `False`, then stdout and stderr are
- preserved as independent streams. This is useful for
- Unix-philosophy apps that have predictable stdout and
- noisy stderr, such that each may be measured
- independently
- """
- def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True):
- if charset is None:
- charset = "utf-8"
- self.charset = charset
- self.env = env or {}
- self.echo_stdin = echo_stdin
- self.mix_stderr = mix_stderr
- def get_default_prog_name(self, cli):
- """Given a command object it will return the default program name
- for it. The default is the `name` attribute or ``"root"`` if not
- set.
- """
- return cli.name or "root"
- def make_env(self, overrides=None):
- """Returns the environment overrides for invoking a script."""
- rv = dict(self.env)
- if overrides:
- rv.update(overrides)
- return rv
- @contextlib.contextmanager
- def isolation(self, input=None, env=None, color=False):
- """A context manager that sets up the isolation for invoking of a
- command line tool. This sets up stdin with the given input data
- and `os.environ` with the overrides from the given dictionary.
- This also rebinds some internals in Click to be mocked (like the
- prompt functionality).
- This is automatically done in the :meth:`invoke` method.
- .. versionadded:: 4.0
- The ``color`` parameter was added.
- :param input: the input stream to put into sys.stdin.
- :param env: the environment overrides as dictionary.
- :param color: whether the output should contain color codes. The
- application can still override this explicitly.
- """
- input = make_input_stream(input, self.charset)
- old_stdin = sys.stdin
- old_stdout = sys.stdout
- old_stderr = sys.stderr
- old_forced_width = formatting.FORCED_WIDTH
- formatting.FORCED_WIDTH = 80
- env = self.make_env(env)
- if PY2:
- bytes_output = StringIO()
- if self.echo_stdin:
- input = EchoingStdin(input, bytes_output)
- sys.stdout = bytes_output
- if not self.mix_stderr:
- bytes_error = StringIO()
- sys.stderr = bytes_error
- else:
- bytes_output = io.BytesIO()
- if self.echo_stdin:
- input = EchoingStdin(input, bytes_output)
- input = io.TextIOWrapper(input, encoding=self.charset)
- sys.stdout = io.TextIOWrapper(bytes_output, encoding=self.charset)
- if not self.mix_stderr:
- bytes_error = io.BytesIO()
- sys.stderr = io.TextIOWrapper(bytes_error, encoding=self.charset)
- if self.mix_stderr:
- sys.stderr = sys.stdout
- sys.stdin = input
- def visible_input(prompt=None):
- sys.stdout.write(prompt or "")
- val = input.readline().rstrip("\r\n")
- sys.stdout.write("{}\n".format(val))
- sys.stdout.flush()
- return val
- def hidden_input(prompt=None):
- sys.stdout.write("{}\n".format(prompt or ""))
- sys.stdout.flush()
- return input.readline().rstrip("\r\n")
- def _getchar(echo):
- char = sys.stdin.read(1)
- if echo:
- sys.stdout.write(char)
- sys.stdout.flush()
- return char
- default_color = color
- def should_strip_ansi(stream=None, color=None):
- if color is None:
- return not default_color
- return not color
- old_visible_prompt_func = termui.visible_prompt_func
- old_hidden_prompt_func = termui.hidden_prompt_func
- old__getchar_func = termui._getchar
- old_should_strip_ansi = utils.should_strip_ansi
- termui.visible_prompt_func = visible_input
- termui.hidden_prompt_func = hidden_input
- termui._getchar = _getchar
- utils.should_strip_ansi = should_strip_ansi
- old_env = {}
- try:
- for key, value in iteritems(env):
- old_env[key] = os.environ.get(key)
- if value is None:
- try:
- del os.environ[key]
- except Exception:
- pass
- else:
- os.environ[key] = value
- yield (bytes_output, not self.mix_stderr and bytes_error)
- finally:
- for key, value in iteritems(old_env):
- if value is None:
- try:
- del os.environ[key]
- except Exception:
- pass
- else:
- os.environ[key] = value
- sys.stdout = old_stdout
- sys.stderr = old_stderr
- sys.stdin = old_stdin
- termui.visible_prompt_func = old_visible_prompt_func
- termui.hidden_prompt_func = old_hidden_prompt_func
- termui._getchar = old__getchar_func
- utils.should_strip_ansi = old_should_strip_ansi
- formatting.FORCED_WIDTH = old_forced_width
- def invoke(
- self,
- cli,
- args=None,
- input=None,
- env=None,
- catch_exceptions=True,
- color=False,
- **extra
- ):
- """Invokes a command in an isolated environment. The arguments are
- forwarded directly to the command line script, the `extra` keyword
- arguments are passed to the :meth:`~clickpkg.Command.main` function of
- the command.
- This returns a :class:`Result` object.
- .. versionadded:: 3.0
- The ``catch_exceptions`` parameter was added.
- .. versionchanged:: 3.0
- The result object now has an `exc_info` attribute with the
- traceback if available.
- .. versionadded:: 4.0
- The ``color`` parameter was added.
- :param cli: the command to invoke
- :param args: the arguments to invoke. It may be given as an iterable
- or a string. When given as string it will be interpreted
- as a Unix shell command. More details at
- :func:`shlex.split`.
- :param input: the input data for `sys.stdin`.
- :param env: the environment overrides.
- :param catch_exceptions: Whether to catch any other exceptions than
- ``SystemExit``.
- :param extra: the keyword arguments to pass to :meth:`main`.
- :param color: whether the output should contain color codes. The
- application can still override this explicitly.
- """
- exc_info = None
- with self.isolation(input=input, env=env, color=color) as outstreams:
- exception = None
- exit_code = 0
- if isinstance(args, string_types):
- args = shlex.split(args)
- try:
- prog_name = extra.pop("prog_name")
- except KeyError:
- prog_name = self.get_default_prog_name(cli)
- try:
- cli.main(args=args or (), prog_name=prog_name, **extra)
- except SystemExit as e:
- exc_info = sys.exc_info()
- exit_code = e.code
- if exit_code is None:
- exit_code = 0
- if exit_code != 0:
- exception = e
- if not isinstance(exit_code, int):
- sys.stdout.write(str(exit_code))
- sys.stdout.write("\n")
- exit_code = 1
- except Exception as e:
- if not catch_exceptions:
- raise
- exception = e
- exit_code = 1
- exc_info = sys.exc_info()
- finally:
- sys.stdout.flush()
- stdout = outstreams[0].getvalue()
- if self.mix_stderr:
- stderr = None
- else:
- stderr = outstreams[1].getvalue()
- return Result(
- runner=self,
- stdout_bytes=stdout,
- stderr_bytes=stderr,
- exit_code=exit_code,
- exception=exception,
- exc_info=exc_info,
- )
- @contextlib.contextmanager
- def isolated_filesystem(self):
- """A context manager that creates a temporary folder and changes
- the current working directory to it for isolated filesystem tests.
- """
- cwd = os.getcwd()
- t = tempfile.mkdtemp()
- os.chdir(t)
- try:
- yield t
- finally:
- os.chdir(cwd)
- try:
- shutil.rmtree(t)
- except (OSError, IOError): # noqa: B014
- pass
|