console.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. # -*- coding: utf-8 -*-
  2. """
  3. werkzeug.debug.console
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. Interactive console support.
  6. :copyright: 2007 Pallets
  7. :license: BSD-3-Clause
  8. """
  9. import code
  10. import sys
  11. from types import CodeType
  12. from ..local import Local
  13. from ..utils import escape
  14. from .repr import debug_repr
  15. from .repr import dump
  16. from .repr import helper
  17. _local = Local()
  18. class HTMLStringO(object):
  19. """A StringO version that HTML escapes on write."""
  20. def __init__(self):
  21. self._buffer = []
  22. def isatty(self):
  23. return False
  24. def close(self):
  25. pass
  26. def flush(self):
  27. pass
  28. def seek(self, n, mode=0):
  29. pass
  30. def readline(self):
  31. if len(self._buffer) == 0:
  32. return ""
  33. ret = self._buffer[0]
  34. del self._buffer[0]
  35. return ret
  36. def reset(self):
  37. val = "".join(self._buffer)
  38. del self._buffer[:]
  39. return val
  40. def _write(self, x):
  41. if isinstance(x, bytes):
  42. x = x.decode("utf-8", "replace")
  43. self._buffer.append(x)
  44. def write(self, x):
  45. self._write(escape(x))
  46. def writelines(self, x):
  47. self._write(escape("".join(x)))
  48. class ThreadedStream(object):
  49. """Thread-local wrapper for sys.stdout for the interactive console."""
  50. @staticmethod
  51. def push():
  52. if not isinstance(sys.stdout, ThreadedStream):
  53. sys.stdout = ThreadedStream()
  54. _local.stream = HTMLStringO()
  55. @staticmethod
  56. def fetch():
  57. try:
  58. stream = _local.stream
  59. except AttributeError:
  60. return ""
  61. return stream.reset()
  62. @staticmethod
  63. def displayhook(obj):
  64. try:
  65. stream = _local.stream
  66. except AttributeError:
  67. return _displayhook(obj)
  68. # stream._write bypasses escaping as debug_repr is
  69. # already generating HTML for us.
  70. if obj is not None:
  71. _local._current_ipy.locals["_"] = obj
  72. stream._write(debug_repr(obj))
  73. def __setattr__(self, name, value):
  74. raise AttributeError("read only attribute %s" % name)
  75. def __dir__(self):
  76. return dir(sys.__stdout__)
  77. def __getattribute__(self, name):
  78. if name == "__members__":
  79. return dir(sys.__stdout__)
  80. try:
  81. stream = _local.stream
  82. except AttributeError:
  83. stream = sys.__stdout__
  84. return getattr(stream, name)
  85. def __repr__(self):
  86. return repr(sys.__stdout__)
  87. # add the threaded stream as display hook
  88. _displayhook = sys.displayhook
  89. sys.displayhook = ThreadedStream.displayhook
  90. class _ConsoleLoader(object):
  91. def __init__(self):
  92. self._storage = {}
  93. def register(self, code, source):
  94. self._storage[id(code)] = source
  95. # register code objects of wrapped functions too.
  96. for var in code.co_consts:
  97. if isinstance(var, CodeType):
  98. self._storage[id(var)] = source
  99. def get_source_by_code(self, code):
  100. try:
  101. return self._storage[id(code)]
  102. except KeyError:
  103. pass
  104. def _wrap_compiler(console):
  105. compile = console.compile
  106. def func(source, filename, symbol):
  107. code = compile(source, filename, symbol)
  108. console.loader.register(code, source)
  109. return code
  110. console.compile = func
  111. class _InteractiveConsole(code.InteractiveInterpreter):
  112. def __init__(self, globals, locals):
  113. _locals = dict(globals)
  114. _locals.update(locals)
  115. locals = _locals
  116. locals["dump"] = dump
  117. locals["help"] = helper
  118. locals["__loader__"] = self.loader = _ConsoleLoader()
  119. code.InteractiveInterpreter.__init__(self, locals)
  120. self.more = False
  121. self.buffer = []
  122. _wrap_compiler(self)
  123. def runsource(self, source):
  124. source = source.rstrip() + "\n"
  125. ThreadedStream.push()
  126. prompt = "... " if self.more else ">>> "
  127. try:
  128. source_to_eval = "".join(self.buffer + [source])
  129. if code.InteractiveInterpreter.runsource(
  130. self, source_to_eval, "<debugger>", "single"
  131. ):
  132. self.more = True
  133. self.buffer.append(source)
  134. else:
  135. self.more = False
  136. del self.buffer[:]
  137. finally:
  138. output = ThreadedStream.fetch()
  139. return prompt + escape(source) + output
  140. def runcode(self, code):
  141. try:
  142. exec(code, self.locals)
  143. except Exception:
  144. self.showtraceback()
  145. def showtraceback(self):
  146. from .tbtools import get_current_traceback
  147. tb = get_current_traceback(skip=1)
  148. sys.stdout._write(tb.render_summary())
  149. def showsyntaxerror(self, filename=None):
  150. from .tbtools import get_current_traceback
  151. tb = get_current_traceback(skip=4)
  152. sys.stdout._write(tb.render_summary())
  153. def write(self, data):
  154. sys.stdout.write(data)
  155. class Console(object):
  156. """An interactive console."""
  157. def __init__(self, globals=None, locals=None):
  158. if locals is None:
  159. locals = {}
  160. if globals is None:
  161. globals = {}
  162. self._ipy = _InteractiveConsole(globals, locals)
  163. def eval(self, code):
  164. _local._current_ipy = self._ipy
  165. old_sys_stdout = sys.stdout
  166. try:
  167. return self._ipy.runsource(code)
  168. finally:
  169. sys.stdout = old_sys_stdout