123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- # -*- coding: utf-8 -*-
- # This module is based on the excellent work by Adam Bartoš who
- # provided a lot of what went into the implementation here in
- # the discussion to issue1602 in the Python bug tracker.
- #
- # There are some general differences in regards to how this works
- # compared to the original patches as we do not need to patch
- # the entire interpreter but just work in our little world of
- # echo and prmopt.
- import ctypes
- import io
- import os
- import sys
- import time
- import zlib
- from ctypes import byref
- from ctypes import c_char
- from ctypes import c_char_p
- from ctypes import c_int
- from ctypes import c_ssize_t
- from ctypes import c_ulong
- from ctypes import c_void_p
- from ctypes import POINTER
- from ctypes import py_object
- from ctypes import windll
- from ctypes import WinError
- from ctypes import WINFUNCTYPE
- from ctypes.wintypes import DWORD
- from ctypes.wintypes import HANDLE
- from ctypes.wintypes import LPCWSTR
- from ctypes.wintypes import LPWSTR
- import msvcrt
- from ._compat import _NonClosingTextIOWrapper
- from ._compat import PY2
- from ._compat import text_type
- try:
- from ctypes import pythonapi
- PyObject_GetBuffer = pythonapi.PyObject_GetBuffer
- PyBuffer_Release = pythonapi.PyBuffer_Release
- except ImportError:
- pythonapi = None
- c_ssize_p = POINTER(c_ssize_t)
- kernel32 = windll.kernel32
- GetStdHandle = kernel32.GetStdHandle
- ReadConsoleW = kernel32.ReadConsoleW
- WriteConsoleW = kernel32.WriteConsoleW
- GetConsoleMode = kernel32.GetConsoleMode
- GetLastError = kernel32.GetLastError
- GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))
- CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))(
- ("CommandLineToArgvW", windll.shell32)
- )
- LocalFree = WINFUNCTYPE(ctypes.c_void_p, ctypes.c_void_p)(
- ("LocalFree", windll.kernel32)
- )
- STDIN_HANDLE = GetStdHandle(-10)
- STDOUT_HANDLE = GetStdHandle(-11)
- STDERR_HANDLE = GetStdHandle(-12)
- PyBUF_SIMPLE = 0
- PyBUF_WRITABLE = 1
- ERROR_SUCCESS = 0
- ERROR_NOT_ENOUGH_MEMORY = 8
- ERROR_OPERATION_ABORTED = 995
- STDIN_FILENO = 0
- STDOUT_FILENO = 1
- STDERR_FILENO = 2
- EOF = b"\x1a"
- MAX_BYTES_WRITTEN = 32767
- class Py_buffer(ctypes.Structure):
- _fields_ = [
- ("buf", c_void_p),
- ("obj", py_object),
- ("len", c_ssize_t),
- ("itemsize", c_ssize_t),
- ("readonly", c_int),
- ("ndim", c_int),
- ("format", c_char_p),
- ("shape", c_ssize_p),
- ("strides", c_ssize_p),
- ("suboffsets", c_ssize_p),
- ("internal", c_void_p),
- ]
- if PY2:
- _fields_.insert(-1, ("smalltable", c_ssize_t * 2))
- # On PyPy we cannot get buffers so our ability to operate here is
- # serverly limited.
- if pythonapi is None:
- get_buffer = None
- else:
- def get_buffer(obj, writable=False):
- buf = Py_buffer()
- flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE
- PyObject_GetBuffer(py_object(obj), byref(buf), flags)
- try:
- buffer_type = c_char * buf.len
- return buffer_type.from_address(buf.buf)
- finally:
- PyBuffer_Release(byref(buf))
- class _WindowsConsoleRawIOBase(io.RawIOBase):
- def __init__(self, handle):
- self.handle = handle
- def isatty(self):
- io.RawIOBase.isatty(self)
- return True
- class _WindowsConsoleReader(_WindowsConsoleRawIOBase):
- def readable(self):
- return True
- def readinto(self, b):
- bytes_to_be_read = len(b)
- if not bytes_to_be_read:
- return 0
- elif bytes_to_be_read % 2:
- raise ValueError(
- "cannot read odd number of bytes from UTF-16-LE encoded console"
- )
- buffer = get_buffer(b, writable=True)
- code_units_to_be_read = bytes_to_be_read // 2
- code_units_read = c_ulong()
- rv = ReadConsoleW(
- HANDLE(self.handle),
- buffer,
- code_units_to_be_read,
- byref(code_units_read),
- None,
- )
- if GetLastError() == ERROR_OPERATION_ABORTED:
- # wait for KeyboardInterrupt
- time.sleep(0.1)
- if not rv:
- raise OSError("Windows error: {}".format(GetLastError()))
- if buffer[0] == EOF:
- return 0
- return 2 * code_units_read.value
- class _WindowsConsoleWriter(_WindowsConsoleRawIOBase):
- def writable(self):
- return True
- @staticmethod
- def _get_error_message(errno):
- if errno == ERROR_SUCCESS:
- return "ERROR_SUCCESS"
- elif errno == ERROR_NOT_ENOUGH_MEMORY:
- return "ERROR_NOT_ENOUGH_MEMORY"
- return "Windows error {}".format(errno)
- def write(self, b):
- bytes_to_be_written = len(b)
- buf = get_buffer(b)
- code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2
- code_units_written = c_ulong()
- WriteConsoleW(
- HANDLE(self.handle),
- buf,
- code_units_to_be_written,
- byref(code_units_written),
- None,
- )
- bytes_written = 2 * code_units_written.value
- if bytes_written == 0 and bytes_to_be_written > 0:
- raise OSError(self._get_error_message(GetLastError()))
- return bytes_written
- class ConsoleStream(object):
- def __init__(self, text_stream, byte_stream):
- self._text_stream = text_stream
- self.buffer = byte_stream
- @property
- def name(self):
- return self.buffer.name
- def write(self, x):
- if isinstance(x, text_type):
- return self._text_stream.write(x)
- try:
- self.flush()
- except Exception:
- pass
- return self.buffer.write(x)
- def writelines(self, lines):
- for line in lines:
- self.write(line)
- def __getattr__(self, name):
- return getattr(self._text_stream, name)
- def isatty(self):
- return self.buffer.isatty()
- def __repr__(self):
- return "<ConsoleStream name={!r} encoding={!r}>".format(
- self.name, self.encoding
- )
- class WindowsChunkedWriter(object):
- """
- Wraps a stream (such as stdout), acting as a transparent proxy for all
- attribute access apart from method 'write()' which we wrap to write in
- limited chunks due to a Windows limitation on binary console streams.
- """
- def __init__(self, wrapped):
- # double-underscore everything to prevent clashes with names of
- # attributes on the wrapped stream object.
- self.__wrapped = wrapped
- def __getattr__(self, name):
- return getattr(self.__wrapped, name)
- def write(self, text):
- total_to_write = len(text)
- written = 0
- while written < total_to_write:
- to_write = min(total_to_write - written, MAX_BYTES_WRITTEN)
- self.__wrapped.write(text[written : written + to_write])
- written += to_write
- _wrapped_std_streams = set()
- def _wrap_std_stream(name):
- # Python 2 & Windows 7 and below
- if (
- PY2
- and sys.getwindowsversion()[:2] <= (6, 1)
- and name not in _wrapped_std_streams
- ):
- setattr(sys, name, WindowsChunkedWriter(getattr(sys, name)))
- _wrapped_std_streams.add(name)
- def _get_text_stdin(buffer_stream):
- text_stream = _NonClosingTextIOWrapper(
- io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)),
- "utf-16-le",
- "strict",
- line_buffering=True,
- )
- return ConsoleStream(text_stream, buffer_stream)
- def _get_text_stdout(buffer_stream):
- text_stream = _NonClosingTextIOWrapper(
- io.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE)),
- "utf-16-le",
- "strict",
- line_buffering=True,
- )
- return ConsoleStream(text_stream, buffer_stream)
- def _get_text_stderr(buffer_stream):
- text_stream = _NonClosingTextIOWrapper(
- io.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE)),
- "utf-16-le",
- "strict",
- line_buffering=True,
- )
- return ConsoleStream(text_stream, buffer_stream)
- if PY2:
- def _hash_py_argv():
- return zlib.crc32("\x00".join(sys.argv[1:]))
- _initial_argv_hash = _hash_py_argv()
- def _get_windows_argv():
- argc = c_int(0)
- argv_unicode = CommandLineToArgvW(GetCommandLineW(), byref(argc))
- if not argv_unicode:
- raise WinError()
- try:
- argv = [argv_unicode[i] for i in range(0, argc.value)]
- finally:
- LocalFree(argv_unicode)
- del argv_unicode
- if not hasattr(sys, "frozen"):
- argv = argv[1:]
- while len(argv) > 0:
- arg = argv[0]
- if not arg.startswith("-") or arg == "-":
- break
- argv = argv[1:]
- if arg.startswith(("-c", "-m")):
- break
- return argv[1:]
- _stream_factories = {
- 0: _get_text_stdin,
- 1: _get_text_stdout,
- 2: _get_text_stderr,
- }
- def _is_console(f):
- if not hasattr(f, "fileno"):
- return False
- try:
- fileno = f.fileno()
- except OSError:
- return False
- handle = msvcrt.get_osfhandle(fileno)
- return bool(GetConsoleMode(handle, byref(DWORD())))
- def _get_windows_console_stream(f, encoding, errors):
- if (
- get_buffer is not None
- and encoding in ("utf-16-le", None)
- and errors in ("strict", None)
- and _is_console(f)
- ):
- func = _stream_factories.get(f.fileno())
- if func is not None:
- if not PY2:
- f = getattr(f, "buffer", None)
- if f is None:
- return None
- else:
- # If we are on Python 2 we need to set the stream that we
- # deal with to binary mode as otherwise the exercise if a
- # bit moot. The same problems apply as for
- # get_binary_stdin and friends from _compat.
- msvcrt.setmode(f.fileno(), os.O_BINARY)
- return func(f)
|