testing.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. import contextlib
  2. import os
  3. import shlex
  4. import shutil
  5. import sys
  6. import tempfile
  7. from . import formatting
  8. from . import termui
  9. from . import utils
  10. from ._compat import iteritems
  11. from ._compat import PY2
  12. from ._compat import string_types
  13. if PY2:
  14. from cStringIO import StringIO
  15. else:
  16. import io
  17. from ._compat import _find_binary_reader
  18. class EchoingStdin(object):
  19. def __init__(self, input, output):
  20. self._input = input
  21. self._output = output
  22. def __getattr__(self, x):
  23. return getattr(self._input, x)
  24. def _echo(self, rv):
  25. self._output.write(rv)
  26. return rv
  27. def read(self, n=-1):
  28. return self._echo(self._input.read(n))
  29. def readline(self, n=-1):
  30. return self._echo(self._input.readline(n))
  31. def readlines(self):
  32. return [self._echo(x) for x in self._input.readlines()]
  33. def __iter__(self):
  34. return iter(self._echo(x) for x in self._input)
  35. def __repr__(self):
  36. return repr(self._input)
  37. def make_input_stream(input, charset):
  38. # Is already an input stream.
  39. if hasattr(input, "read"):
  40. if PY2:
  41. return input
  42. rv = _find_binary_reader(input)
  43. if rv is not None:
  44. return rv
  45. raise TypeError("Could not find binary reader for input stream.")
  46. if input is None:
  47. input = b""
  48. elif not isinstance(input, bytes):
  49. input = input.encode(charset)
  50. if PY2:
  51. return StringIO(input)
  52. return io.BytesIO(input)
  53. class Result(object):
  54. """Holds the captured result of an invoked CLI script."""
  55. def __init__(
  56. self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None
  57. ):
  58. #: The runner that created the result
  59. self.runner = runner
  60. #: The standard output as bytes.
  61. self.stdout_bytes = stdout_bytes
  62. #: The standard error as bytes, or None if not available
  63. self.stderr_bytes = stderr_bytes
  64. #: The exit code as integer.
  65. self.exit_code = exit_code
  66. #: The exception that happened if one did.
  67. self.exception = exception
  68. #: The traceback
  69. self.exc_info = exc_info
  70. @property
  71. def output(self):
  72. """The (standard) output as unicode string."""
  73. return self.stdout
  74. @property
  75. def stdout(self):
  76. """The standard output as unicode string."""
  77. return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
  78. "\r\n", "\n"
  79. )
  80. @property
  81. def stderr(self):
  82. """The standard error as unicode string."""
  83. if self.stderr_bytes is None:
  84. raise ValueError("stderr not separately captured")
  85. return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
  86. "\r\n", "\n"
  87. )
  88. def __repr__(self):
  89. return "<{} {}>".format(
  90. type(self).__name__, repr(self.exception) if self.exception else "okay"
  91. )
  92. class CliRunner(object):
  93. """The CLI runner provides functionality to invoke a Click command line
  94. script for unittesting purposes in a isolated environment. This only
  95. works in single-threaded systems without any concurrency as it changes the
  96. global interpreter state.
  97. :param charset: the character set for the input and output data. This is
  98. UTF-8 by default and should not be changed currently as
  99. the reporting to Click only works in Python 2 properly.
  100. :param env: a dictionary with environment variables for overriding.
  101. :param echo_stdin: if this is set to `True`, then reading from stdin writes
  102. to stdout. This is useful for showing examples in
  103. some circumstances. Note that regular prompts
  104. will automatically echo the input.
  105. :param mix_stderr: if this is set to `False`, then stdout and stderr are
  106. preserved as independent streams. This is useful for
  107. Unix-philosophy apps that have predictable stdout and
  108. noisy stderr, such that each may be measured
  109. independently
  110. """
  111. def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True):
  112. if charset is None:
  113. charset = "utf-8"
  114. self.charset = charset
  115. self.env = env or {}
  116. self.echo_stdin = echo_stdin
  117. self.mix_stderr = mix_stderr
  118. def get_default_prog_name(self, cli):
  119. """Given a command object it will return the default program name
  120. for it. The default is the `name` attribute or ``"root"`` if not
  121. set.
  122. """
  123. return cli.name or "root"
  124. def make_env(self, overrides=None):
  125. """Returns the environment overrides for invoking a script."""
  126. rv = dict(self.env)
  127. if overrides:
  128. rv.update(overrides)
  129. return rv
  130. @contextlib.contextmanager
  131. def isolation(self, input=None, env=None, color=False):
  132. """A context manager that sets up the isolation for invoking of a
  133. command line tool. This sets up stdin with the given input data
  134. and `os.environ` with the overrides from the given dictionary.
  135. This also rebinds some internals in Click to be mocked (like the
  136. prompt functionality).
  137. This is automatically done in the :meth:`invoke` method.
  138. .. versionadded:: 4.0
  139. The ``color`` parameter was added.
  140. :param input: the input stream to put into sys.stdin.
  141. :param env: the environment overrides as dictionary.
  142. :param color: whether the output should contain color codes. The
  143. application can still override this explicitly.
  144. """
  145. input = make_input_stream(input, self.charset)
  146. old_stdin = sys.stdin
  147. old_stdout = sys.stdout
  148. old_stderr = sys.stderr
  149. old_forced_width = formatting.FORCED_WIDTH
  150. formatting.FORCED_WIDTH = 80
  151. env = self.make_env(env)
  152. if PY2:
  153. bytes_output = StringIO()
  154. if self.echo_stdin:
  155. input = EchoingStdin(input, bytes_output)
  156. sys.stdout = bytes_output
  157. if not self.mix_stderr:
  158. bytes_error = StringIO()
  159. sys.stderr = bytes_error
  160. else:
  161. bytes_output = io.BytesIO()
  162. if self.echo_stdin:
  163. input = EchoingStdin(input, bytes_output)
  164. input = io.TextIOWrapper(input, encoding=self.charset)
  165. sys.stdout = io.TextIOWrapper(bytes_output, encoding=self.charset)
  166. if not self.mix_stderr:
  167. bytes_error = io.BytesIO()
  168. sys.stderr = io.TextIOWrapper(bytes_error, encoding=self.charset)
  169. if self.mix_stderr:
  170. sys.stderr = sys.stdout
  171. sys.stdin = input
  172. def visible_input(prompt=None):
  173. sys.stdout.write(prompt or "")
  174. val = input.readline().rstrip("\r\n")
  175. sys.stdout.write("{}\n".format(val))
  176. sys.stdout.flush()
  177. return val
  178. def hidden_input(prompt=None):
  179. sys.stdout.write("{}\n".format(prompt or ""))
  180. sys.stdout.flush()
  181. return input.readline().rstrip("\r\n")
  182. def _getchar(echo):
  183. char = sys.stdin.read(1)
  184. if echo:
  185. sys.stdout.write(char)
  186. sys.stdout.flush()
  187. return char
  188. default_color = color
  189. def should_strip_ansi(stream=None, color=None):
  190. if color is None:
  191. return not default_color
  192. return not color
  193. old_visible_prompt_func = termui.visible_prompt_func
  194. old_hidden_prompt_func = termui.hidden_prompt_func
  195. old__getchar_func = termui._getchar
  196. old_should_strip_ansi = utils.should_strip_ansi
  197. termui.visible_prompt_func = visible_input
  198. termui.hidden_prompt_func = hidden_input
  199. termui._getchar = _getchar
  200. utils.should_strip_ansi = should_strip_ansi
  201. old_env = {}
  202. try:
  203. for key, value in iteritems(env):
  204. old_env[key] = os.environ.get(key)
  205. if value is None:
  206. try:
  207. del os.environ[key]
  208. except Exception:
  209. pass
  210. else:
  211. os.environ[key] = value
  212. yield (bytes_output, not self.mix_stderr and bytes_error)
  213. finally:
  214. for key, value in iteritems(old_env):
  215. if value is None:
  216. try:
  217. del os.environ[key]
  218. except Exception:
  219. pass
  220. else:
  221. os.environ[key] = value
  222. sys.stdout = old_stdout
  223. sys.stderr = old_stderr
  224. sys.stdin = old_stdin
  225. termui.visible_prompt_func = old_visible_prompt_func
  226. termui.hidden_prompt_func = old_hidden_prompt_func
  227. termui._getchar = old__getchar_func
  228. utils.should_strip_ansi = old_should_strip_ansi
  229. formatting.FORCED_WIDTH = old_forced_width
  230. def invoke(
  231. self,
  232. cli,
  233. args=None,
  234. input=None,
  235. env=None,
  236. catch_exceptions=True,
  237. color=False,
  238. **extra
  239. ):
  240. """Invokes a command in an isolated environment. The arguments are
  241. forwarded directly to the command line script, the `extra` keyword
  242. arguments are passed to the :meth:`~clickpkg.Command.main` function of
  243. the command.
  244. This returns a :class:`Result` object.
  245. .. versionadded:: 3.0
  246. The ``catch_exceptions`` parameter was added.
  247. .. versionchanged:: 3.0
  248. The result object now has an `exc_info` attribute with the
  249. traceback if available.
  250. .. versionadded:: 4.0
  251. The ``color`` parameter was added.
  252. :param cli: the command to invoke
  253. :param args: the arguments to invoke. It may be given as an iterable
  254. or a string. When given as string it will be interpreted
  255. as a Unix shell command. More details at
  256. :func:`shlex.split`.
  257. :param input: the input data for `sys.stdin`.
  258. :param env: the environment overrides.
  259. :param catch_exceptions: Whether to catch any other exceptions than
  260. ``SystemExit``.
  261. :param extra: the keyword arguments to pass to :meth:`main`.
  262. :param color: whether the output should contain color codes. The
  263. application can still override this explicitly.
  264. """
  265. exc_info = None
  266. with self.isolation(input=input, env=env, color=color) as outstreams:
  267. exception = None
  268. exit_code = 0
  269. if isinstance(args, string_types):
  270. args = shlex.split(args)
  271. try:
  272. prog_name = extra.pop("prog_name")
  273. except KeyError:
  274. prog_name = self.get_default_prog_name(cli)
  275. try:
  276. cli.main(args=args or (), prog_name=prog_name, **extra)
  277. except SystemExit as e:
  278. exc_info = sys.exc_info()
  279. exit_code = e.code
  280. if exit_code is None:
  281. exit_code = 0
  282. if exit_code != 0:
  283. exception = e
  284. if not isinstance(exit_code, int):
  285. sys.stdout.write(str(exit_code))
  286. sys.stdout.write("\n")
  287. exit_code = 1
  288. except Exception as e:
  289. if not catch_exceptions:
  290. raise
  291. exception = e
  292. exit_code = 1
  293. exc_info = sys.exc_info()
  294. finally:
  295. sys.stdout.flush()
  296. stdout = outstreams[0].getvalue()
  297. if self.mix_stderr:
  298. stderr = None
  299. else:
  300. stderr = outstreams[1].getvalue()
  301. return Result(
  302. runner=self,
  303. stdout_bytes=stdout,
  304. stderr_bytes=stderr,
  305. exit_code=exit_code,
  306. exception=exception,
  307. exc_info=exc_info,
  308. )
  309. @contextlib.contextmanager
  310. def isolated_filesystem(self):
  311. """A context manager that creates a temporary folder and changes
  312. the current working directory to it for isolated filesystem tests.
  313. """
  314. cwd = os.getcwd()
  315. t = tempfile.mkdtemp()
  316. os.chdir(t)
  317. try:
  318. yield t
  319. finally:
  320. os.chdir(cwd)
  321. try:
  322. shutil.rmtree(t)
  323. except (OSError, IOError): # noqa: B014
  324. pass