|
- """Support for installing and building the "wheel" binary package format.
- """
- from __future__ import absolute_import
- import collections
- import compileall
- import contextlib
- import csv
- import importlib
- import logging
- import os.path
- import re
- import shutil
- import sys
- import warnings
- from base64 import urlsafe_b64encode
- from itertools import chain, starmap
- from zipfile import ZipFile
- from pip._vendor import pkg_resources
- from pip._vendor.distlib.scripts import ScriptMaker
- from pip._vendor.distlib.util import get_export_entry
- from pip._vendor.six import (
- PY2,
- ensure_str,
- ensure_text,
- itervalues,
- reraise,
- text_type,
- )
- from pip._vendor.six.moves import filterfalse, map
- from pip._internal.exceptions import InstallationError
- from pip._internal.locations import get_major_minor_version
- from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
- from pip._internal.models.scheme import SCHEME_KEYS
- from pip._internal.utils.filesystem import adjacent_tmp_file, replace
- from pip._internal.utils.misc import (
- captured_stdout,
- ensure_dir,
- hash_file,
- partition,
- )
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- from pip._internal.utils.unpacking import (
- current_umask,
- is_within_directory,
- set_extracted_file_to_default_mode_plus_executable,
- zip_item_is_executable,
- )
- from pip._internal.utils.wheel import (
- parse_wheel,
- pkg_resources_distribution_for_wheel,
- )
- # Use the custom cast function at runtime to make cast work,
- # and import typing.cast when performing pre-commit and type
- # checks
- if not MYPY_CHECK_RUNNING:
- from pip._internal.utils.typing import cast
- else:
- from email.message import Message
- from typing import (
- Any,
- Callable,
- Dict,
- IO,
- Iterable,
- Iterator,
- List,
- NewType,
- Optional,
- Protocol,
- Sequence,
- Set,
- Tuple,
- Union,
- cast,
- )
- from zipfile import ZipInfo
- from pip._vendor.pkg_resources import Distribution
- from pip._internal.models.scheme import Scheme
- from pip._internal.utils.filesystem import NamedTemporaryFileResult
- RecordPath = NewType('RecordPath', text_type)
- InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]]
- class File(Protocol):
- src_record_path = None # type: RecordPath
- dest_path = None # type: text_type
- changed = None # type: bool
- def save(self):
- # type: () -> None
- pass
- logger = logging.getLogger(__name__)
- def rehash(path, blocksize=1 << 20):
- # type: (text_type, int) -> Tuple[str, str]
- """Return (encoded_digest, length) for path using hashlib.sha256()"""
- h, length = hash_file(path, blocksize)
- digest = 'sha256=' + urlsafe_b64encode(
- h.digest()
- ).decode('latin1').rstrip('=')
- # unicode/str python2 issues
- return (digest, str(length)) # type: ignore
- def csv_io_kwargs(mode):
- # type: (str) -> Dict[str, Any]
- """Return keyword arguments to properly open a CSV file
- in the given mode.
- """
- if PY2:
- return {'mode': '{}b'.format(mode)}
- else:
- return {'mode': mode, 'newline': '', 'encoding': 'utf-8'}
- def fix_script(path):
- # type: (text_type) -> bool
- """Replace #!python with #!/path/to/python
- Return True if file was changed.
- """
- # XXX RECORD hashes will need to be updated
- assert os.path.isfile(path)
- with open(path, 'rb') as script:
- firstline = script.readline()
- if not firstline.startswith(b'#!python'):
- return False
- exename = sys.executable.encode(sys.getfilesystemencoding())
- firstline = b'#!' + exename + os.linesep.encode("ascii")
- rest = script.read()
- with open(path, 'wb') as script:
- script.write(firstline)
- script.write(rest)
- return True
- def wheel_root_is_purelib(metadata):
- # type: (Message) -> bool
- return metadata.get("Root-Is-Purelib", "").lower() == "true"
- def get_entrypoints(distribution):
- # type: (Distribution) -> Tuple[Dict[str, str], Dict[str, str]]
- # get the entry points and then the script names
- try:
- console = distribution.get_entry_map('console_scripts')
- gui = distribution.get_entry_map('gui_scripts')
- except KeyError:
- # Our dict-based Distribution raises KeyError if entry_points.txt
- # doesn't exist.
- return {}, {}
- def _split_ep(s):
- # type: (pkg_resources.EntryPoint) -> Tuple[str, str]
- """get the string representation of EntryPoint,
- remove space and split on '='
- """
- split_parts = str(s).replace(" ", "").split("=")
- return split_parts[0], split_parts[1]
- # convert the EntryPoint objects into strings with module:function
- console = dict(_split_ep(v) for v in console.values())
- gui = dict(_split_ep(v) for v in gui.values())
- return console, gui
- def message_about_scripts_not_on_PATH(scripts):
- # type: (Sequence[str]) -> Optional[str]
- """Determine if any scripts are not on PATH and format a warning.
- Returns a warning message if one or more scripts are not on PATH,
- otherwise None.
- """
- if not scripts:
- return None
- # Group scripts by the path they were installed in
- grouped_by_dir = collections.defaultdict(set) # type: Dict[str, Set[str]]
- for destfile in scripts:
- parent_dir = os.path.dirname(destfile)
- script_name = os.path.basename(destfile)
- grouped_by_dir[parent_dir].add(script_name)
- # We don't want to warn for directories that are on PATH.
- not_warn_dirs = [
- os.path.normcase(i).rstrip(os.sep) for i in
- os.environ.get("PATH", "").split(os.pathsep)
- ]
- # If an executable sits with sys.executable, we don't warn for it.
- # This covers the case of venv invocations without activating the venv.
- not_warn_dirs.append(os.path.normcase(os.path.dirname(sys.executable)))
- warn_for = {
- parent_dir: scripts for parent_dir, scripts in grouped_by_dir.items()
- if os.path.normcase(parent_dir) not in not_warn_dirs
- } # type: Dict[str, Set[str]]
- if not warn_for:
- return None
- # Format a message
- msg_lines = []
- for parent_dir, dir_scripts in warn_for.items():
- sorted_scripts = sorted(dir_scripts) # type: List[str]
- if len(sorted_scripts) == 1:
- start_text = "script {} is".format(sorted_scripts[0])
- else:
- start_text = "scripts {} are".format(
- ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
- )
- msg_lines.append(
- "The {} installed in '{}' which is not on PATH."
- .format(start_text, parent_dir)
- )
- last_line_fmt = (
- "Consider adding {} to PATH or, if you prefer "
- "to suppress this warning, use --no-warn-script-location."
- )
- if len(msg_lines) == 1:
- msg_lines.append(last_line_fmt.format("this directory"))
- else:
- msg_lines.append(last_line_fmt.format("these directories"))
- # Add a note if any directory starts with ~
- warn_for_tilde = any(
- i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
- )
- if warn_for_tilde:
- tilde_warning_msg = (
- "NOTE: The current PATH contains path(s) starting with `~`, "
- "which may not be expanded by all applications."
- )
- msg_lines.append(tilde_warning_msg)
- # Returns the formatted multiline message
- return "\n".join(msg_lines)
- def _normalized_outrows(outrows):
- # type: (Iterable[InstalledCSVRow]) -> List[Tuple[str, str, str]]
- """Normalize the given rows of a RECORD file.
- Items in each row are converted into str. Rows are then sorted to make
- the value more predictable for tests.
- Each row is a 3-tuple (path, hash, size) and corresponds to a record of
- a RECORD file (see PEP 376 and PEP 427 for details). For the rows
- passed to this function, the size can be an integer as an int or string,
- or the empty string.
- """
- # Normally, there should only be one row per path, in which case the
- # second and third elements don't come into play when sorting.
- # However, in cases in the wild where a path might happen to occur twice,
- # we don't want the sort operation to trigger an error (but still want
- # determinism). Since the third element can be an int or string, we
- # coerce each element to a string to avoid a TypeError in this case.
- # For additional background, see--
- # https://github.com/pypa/pip/issues/5868
- return sorted(
- (ensure_str(record_path, encoding='utf-8'), hash_, str(size))
- for record_path, hash_, size in outrows
- )
- def _record_to_fs_path(record_path):
- # type: (RecordPath) -> text_type
- return record_path
- def _fs_to_record_path(path, relative_to=None):
- # type: (text_type, Optional[text_type]) -> RecordPath
- if relative_to is not None:
- # On Windows, do not handle relative paths if they belong to different
- # logical disks
- if os.path.splitdrive(path)[0].lower() == \
- os.path.splitdrive(relative_to)[0].lower():
- path = os.path.relpath(path, relative_to)
- path = path.replace(os.path.sep, '/')
- return cast('RecordPath', path)
- def _parse_record_path(record_column):
- # type: (str) -> RecordPath
- p = ensure_text(record_column, encoding='utf-8')
- return cast('RecordPath', p)
- def get_csv_rows_for_installed(
- old_csv_rows, # type: List[List[str]]
- installed, # type: Dict[RecordPath, RecordPath]
- changed, # type: Set[RecordPath]
- generated, # type: List[str]
- lib_dir, # type: str
- ):
- # type: (...) -> List[InstalledCSVRow]
- """
- :param installed: A map from archive RECORD path to installation RECORD
- path.
- """
- installed_rows = [] # type: List[InstalledCSVRow]
- for row in old_csv_rows:
- if len(row) > 3:
- logger.warning('RECORD line has more than three elements: %s', row)
- old_record_path = _parse_record_path(row[0])
- new_record_path = installed.pop(old_record_path, old_record_path)
- if new_record_path in changed:
- digest, length = rehash(_record_to_fs_path(new_record_path))
- else:
- digest = row[1] if len(row) > 1 else ''
- length = row[2] if len(row) > 2 else ''
- installed_rows.append((new_record_path, digest, length))
- for f in generated:
- path = _fs_to_record_path(f, lib_dir)
- digest, length = rehash(f)
- installed_rows.append((path, digest, length))
- for installed_record_path in itervalues(installed):
- installed_rows.append((installed_record_path, '', ''))
- return installed_rows
- def get_console_script_specs(console):
- # type: (Dict[str, str]) -> List[str]
- """
- Given the mapping from entrypoint name to callable, return the relevant
- console script specs.
- """
- # Don't mutate caller's version
- console = console.copy()
- scripts_to_generate = []
- # Special case pip and setuptools to generate versioned wrappers
- #
- # The issue is that some projects (specifically, pip and setuptools) use
- # code in setup.py to create "versioned" entry points - pip2.7 on Python
- # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
- # the wheel metadata at build time, and so if the wheel is installed with
- # a *different* version of Python the entry points will be wrong. The
- # correct fix for this is to enhance the metadata to be able to describe
- # such versioned entry points, but that won't happen till Metadata 2.0 is
- # available.
- # In the meantime, projects using versioned entry points will either have
- # incorrect versioned entry points, or they will not be able to distribute
- # "universal" wheels (i.e., they will need a wheel per Python version).
- #
- # Because setuptools and pip are bundled with _ensurepip and virtualenv,
- # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we
- # override the versioned entry points in the wheel and generate the
- # correct ones. This code is purely a short-term measure until Metadata 2.0
- # is available.
- #
- # To add the level of hack in this section of code, in order to support
- # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
- # variable which will control which version scripts get installed.
- #
- # ENSUREPIP_OPTIONS=altinstall
- # - Only pipX.Y and easy_install-X.Y will be generated and installed
- # ENSUREPIP_OPTIONS=install
- # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
- # that this option is technically if ENSUREPIP_OPTIONS is set and is
- # not altinstall
- # DEFAULT
- # - The default behavior is to install pip, pipX, pipX.Y, easy_install
- # and easy_install-X.Y.
- pip_script = console.pop('pip', None)
- if pip_script:
- if "ENSUREPIP_OPTIONS" not in os.environ:
- scripts_to_generate.append('pip = ' + pip_script)
- if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
- scripts_to_generate.append(
- 'pip{} = {}'.format(sys.version_info[0], pip_script)
- )
- scripts_to_generate.append(
- 'pip{} = {}'.format(get_major_minor_version(), pip_script)
- )
- # Delete any other versioned pip entry points
- pip_ep = [k for k in console if re.match(r'pip(\d(\.\d)?)?$', k)]
- for k in pip_ep:
- del console[k]
- easy_install_script = console.pop('easy_install', None)
- if easy_install_script:
- if "ENSUREPIP_OPTIONS" not in os.environ:
- scripts_to_generate.append(
- 'easy_install = ' + easy_install_script
- )
- scripts_to_generate.append(
- 'easy_install-{} = {}'.format(
- get_major_minor_version(), easy_install_script
- )
- )
- # Delete any other versioned easy_install entry points
- easy_install_ep = [
- k for k in console if re.match(r'easy_install(-\d\.\d)?$', k)
- ]
- for k in easy_install_ep:
- del console[k]
- # Generate the console entry points specified in the wheel
- scripts_to_generate.extend(starmap('{} = {}'.format, console.items()))
- return scripts_to_generate
- class ZipBackedFile(object):
- def __init__(self, src_record_path, dest_path, zip_file):
- # type: (RecordPath, text_type, ZipFile) -> None
- self.src_record_path = src_record_path
- self.dest_path = dest_path
- self._zip_file = zip_file
- self.changed = False
- def _getinfo(self):
- # type: () -> ZipInfo
- if not PY2:
- return self._zip_file.getinfo(self.src_record_path)
- # Python 2 does not expose a way to detect a ZIP's encoding, but the
- # wheel specification (PEP 427) explicitly mandates that paths should
- # use UTF-8, so we assume it is true.
- return self._zip_file.getinfo(self.src_record_path.encode("utf-8"))
- def save(self):
- # type: () -> None
- # directory creation is lazy and after file filtering
- # to ensure we don't install empty dirs; empty dirs can't be
- # uninstalled.
- parent_dir = os.path.dirname(self.dest_path)
- ensure_dir(parent_dir)
- # When we open the output file below, any existing file is truncated
- # before we start writing the new contents. This is fine in most
- # cases, but can cause a segfault if pip has loaded a shared
- # object (e.g. from pyopenssl through its vendored urllib3)
- # Since the shared object is mmap'd an attempt to call a
- # symbol in it will then cause a segfault. Unlinking the file
- # allows writing of new contents while allowing the process to
- # continue to use the old copy.
- if os.path.exists(self.dest_path):
- os.unlink(self.dest_path)
- zipinfo = self._getinfo()
- with self._zip_file.open(zipinfo) as f:
- with open(self.dest_path, "wb") as dest:
- shutil.copyfileobj(f, dest)
- if zip_item_is_executable(zipinfo):
- set_extracted_file_to_default_mode_plus_executable(self.dest_path)
- class ScriptFile(object):
- def __init__(self, file):
- # type: (File) -> None
- self._file = file
- self.src_record_path = self._file.src_record_path
- self.dest_path = self._file.dest_path
- self.changed = False
- def save(self):
- # type: () -> None
- self._file.save()
- self.changed = fix_script(self.dest_path)
- class MissingCallableSuffix(InstallationError):
- def __init__(self, entry_point):
- # type: (str) -> None
- super(MissingCallableSuffix, self).__init__(
- "Invalid script entry point: {} - A callable "
- "suffix is required. Cf https://packaging.python.org/"
- "specifications/entry-points/#use-for-scripts for more "
- "information.".format(entry_point)
- )
- def _raise_for_invalid_entrypoint(specification):
- # type: (str) -> None
- entry = get_export_entry(specification)
- if entry is not None and entry.suffix is None:
- raise MissingCallableSuffix(str(entry))
- class PipScriptMaker(ScriptMaker):
- def make(self, specification, options=None):
- # type: (str, Dict[str, Any]) -> List[str]
- _raise_for_invalid_entrypoint(specification)
- return super(PipScriptMaker, self).make(specification, options)
- def _install_wheel(
- name, # type: str
- wheel_zip, # type: ZipFile
- wheel_path, # type: str
- scheme, # type: Scheme
- pycompile=True, # type: bool
- warn_script_location=True, # type: bool
- direct_url=None, # type: Optional[DirectUrl]
- requested=False, # type: bool
- ):
- # type: (...) -> None
- """Install a wheel.
- :param name: Name of the project to install
- :param wheel_zip: open ZipFile for wheel being installed
- :param scheme: Distutils scheme dictating the install directories
- :param req_description: String used in place of the requirement, for
- logging
- :param pycompile: Whether to byte-compile installed Python files
- :param warn_script_location: Whether to check that scripts are installed
- into a directory on PATH
- :raises UnsupportedWheel:
- * when the directory holds an unpacked wheel with incompatible
- Wheel-Version
- * when the .dist-info dir does not match the wheel
- """
- info_dir, metadata = parse_wheel(wheel_zip, name)
- if wheel_root_is_purelib(metadata):
- lib_dir = scheme.purelib
- else:
- lib_dir = scheme.platlib
- # Record details of the files moved
- # installed = files copied from the wheel to the destination
- # changed = files changed while installing (scripts #! line typically)
- # generated = files newly generated during the install (script wrappers)
- installed = {} # type: Dict[RecordPath, RecordPath]
- changed = set() # type: Set[RecordPath]
- generated = [] # type: List[str]
- def record_installed(srcfile, destfile, modified=False):
- # type: (RecordPath, text_type, bool) -> None
- """Map archive RECORD paths to installation RECORD paths."""
- newpath = _fs_to_record_path(destfile, lib_dir)
- installed[srcfile] = newpath
- if modified:
- changed.add(_fs_to_record_path(destfile))
- def all_paths():
- # type: () -> Iterable[RecordPath]
- names = wheel_zip.namelist()
- # If a flag is set, names may be unicode in Python 2. We convert to
- # text explicitly so these are valid for lookup in RECORD.
- decoded_names = map(ensure_text, names)
- for name in decoded_names:
- yield cast("RecordPath", name)
- def is_dir_path(path):
- # type: (RecordPath) -> bool
- return path.endswith("/")
- def assert_no_path_traversal(dest_dir_path, target_path):
- # type: (text_type, text_type) -> None
- if not is_within_directory(dest_dir_path, target_path):
- message = (
- "The wheel {!r} has a file {!r} trying to install"
- " outside the target directory {!r}"
- )
- raise InstallationError(
- message.format(wheel_path, target_path, dest_dir_path)
- )
- def root_scheme_file_maker(zip_file, dest):
- # type: (ZipFile, text_type) -> Callable[[RecordPath], File]
- def make_root_scheme_file(record_path):
- # type: (RecordPath) -> File
- normed_path = os.path.normpath(record_path)
- dest_path = os.path.join(dest, normed_path)
- assert_no_path_traversal(dest, dest_path)
- return ZipBackedFile(record_path, dest_path, zip_file)
- return make_root_scheme_file
- def data_scheme_file_maker(zip_file, scheme):
- # type: (ZipFile, Scheme) -> Callable[[RecordPath], File]
- scheme_paths = {}
- for key in SCHEME_KEYS:
- encoded_key = ensure_text(key)
- scheme_paths[encoded_key] = ensure_text(
- getattr(scheme, key), encoding=sys.getfilesystemencoding()
- )
- def make_data_scheme_file(record_path):
- # type: (RecordPath) -> File
- normed_path = os.path.normpath(record_path)
- try:
- _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
- except ValueError:
- message = (
- "Unexpected file in {}: {!r}. .data directory contents"
- " should be named like: '<scheme key>/<path>'."
- ).format(wheel_path, record_path)
- raise InstallationError(message)
- try:
- scheme_path = scheme_paths[scheme_key]
- except KeyError:
- valid_scheme_keys = ", ".join(sorted(scheme_paths))
- message = (
- "Unknown scheme key used in {}: {} (for file {!r}). .data"
- " directory contents should be in subdirectories named"
- " with a valid scheme key ({})"
- ).format(
- wheel_path, scheme_key, record_path, valid_scheme_keys
- )
- raise InstallationError(message)
- dest_path = os.path.join(scheme_path, dest_subpath)
- assert_no_path_traversal(scheme_path, dest_path)
- return ZipBackedFile(record_path, dest_path, zip_file)
- return make_data_scheme_file
- def is_data_scheme_path(path):
- # type: (RecordPath) -> bool
- return path.split("/", 1)[0].endswith(".data")
- paths = all_paths()
- file_paths = filterfalse(is_dir_path, paths)
- root_scheme_paths, data_scheme_paths = partition(
- is_data_scheme_path, file_paths
- )
- make_root_scheme_file = root_scheme_file_maker(
- wheel_zip,
- ensure_text(lib_dir, encoding=sys.getfilesystemencoding()),
- )
- files = map(make_root_scheme_file, root_scheme_paths)
- def is_script_scheme_path(path):
- # type: (RecordPath) -> bool
- parts = path.split("/", 2)
- return (
- len(parts) > 2 and
- parts[0].endswith(".data") and
- parts[1] == "scripts"
- )
- other_scheme_paths, script_scheme_paths = partition(
- is_script_scheme_path, data_scheme_paths
- )
- make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
- other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
- files = chain(files, other_scheme_files)
- # Get the defined entry points
- distribution = pkg_resources_distribution_for_wheel(
- wheel_zip, name, wheel_path
- )
- console, gui = get_entrypoints(distribution)
- def is_entrypoint_wrapper(file):
- # type: (File) -> bool
- # EP, EP.exe and EP-script.py are scripts generated for
- # entry point EP by setuptools
- path = file.dest_path
- name = os.path.basename(path)
- if name.lower().endswith('.exe'):
- matchname = name[:-4]
- elif name.lower().endswith('-script.py'):
- matchname = name[:-10]
- elif name.lower().endswith(".pya"):
- matchname = name[:-4]
- else:
- matchname = name
- # Ignore setuptools-generated scripts
- return (matchname in console or matchname in gui)
- script_scheme_files = map(make_data_scheme_file, script_scheme_paths)
- script_scheme_files = filterfalse(
- is_entrypoint_wrapper, script_scheme_files
- )
- script_scheme_files = map(ScriptFile, script_scheme_files)
- files = chain(files, script_scheme_files)
- for file in files:
- file.save()
- record_installed(file.src_record_path, file.dest_path, file.changed)
- def pyc_source_file_paths():
- # type: () -> Iterator[text_type]
- # We de-duplicate installation paths, since there can be overlap (e.g.
- # file in .data maps to same location as file in wheel root).
- # Sorting installation paths makes it easier to reproduce and debug
- # issues related to permissions on existing files.
- for installed_path in sorted(set(installed.values())):
- full_installed_path = os.path.join(lib_dir, installed_path)
- if not os.path.isfile(full_installed_path):
- continue
- if not full_installed_path.endswith('.py'):
- continue
- yield full_installed_path
- def pyc_output_path(path):
- # type: (text_type) -> text_type
- """Return the path the pyc file would have been written to.
- """
- if PY2:
- if sys.flags.optimize:
- return path + 'o'
- else:
- return path + 'c'
- else:
- return importlib.util.cache_from_source(path)
- # Compile all of the pyc files for the installed files
- if pycompile:
- with captured_stdout() as stdout:
- with warnings.catch_warnings():
- warnings.filterwarnings('ignore')
- for path in pyc_source_file_paths():
- # Python 2's `compileall.compile_file` requires a str in
- # error cases, so we must convert to the native type.
- path_arg = ensure_str(
- path, encoding=sys.getfilesystemencoding()
- )
- success = compileall.compile_file(
- path_arg, force=True, quiet=True
- )
- if success:
- pyc_path = pyc_output_path(path)
- assert os.path.exists(pyc_path)
- pyc_record_path = cast(
- "RecordPath", pyc_path.replace(os.path.sep, "/")
- )
- record_installed(pyc_record_path, pyc_path)
- logger.debug(stdout.getvalue())
- maker = PipScriptMaker(None, scheme.scripts)
- # Ensure old scripts are overwritten.
- # See https://github.com/pypa/pip/issues/1800
- maker.clobber = True
- # Ensure we don't generate any variants for scripts because this is almost
- # never what somebody wants.
- # See https://bitbucket.org/pypa/distlib/issue/35/
- maker.variants = {''}
- # This is required because otherwise distlib creates scripts that are not
- # executable.
- # See https://bitbucket.org/pypa/distlib/issue/32/
- maker.set_mode = True
- # Generate the console and GUI entry points specified in the wheel
- scripts_to_generate = get_console_script_specs(console)
- gui_scripts_to_generate = list(starmap('{} = {}'.format, gui.items()))
- generated_console_scripts = maker.make_multiple(scripts_to_generate)
- generated.extend(generated_console_scripts)
- generated.extend(
- maker.make_multiple(gui_scripts_to_generate, {'gui': True})
- )
- if warn_script_location:
- msg = message_about_scripts_not_on_PATH(generated_console_scripts)
- if msg is not None:
- logger.warning(msg)
- generated_file_mode = 0o666 & ~current_umask()
- @contextlib.contextmanager
- def _generate_file(path, **kwargs):
- # type: (str, **Any) -> Iterator[NamedTemporaryFileResult]
- with adjacent_tmp_file(path, **kwargs) as f:
- yield f
- os.chmod(f.name, generated_file_mode)
- replace(f.name, path)
- dest_info_dir = os.path.join(lib_dir, info_dir)
- # Record pip as the installer
- installer_path = os.path.join(dest_info_dir, 'INSTALLER')
- with _generate_file(installer_path) as installer_file:
- installer_file.write(b'pip\n')
- generated.append(installer_path)
- # Record the PEP 610 direct URL reference
- if direct_url is not None:
- direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
- with _generate_file(direct_url_path) as direct_url_file:
- direct_url_file.write(direct_url.to_json().encode("utf-8"))
- generated.append(direct_url_path)
- # Record the REQUESTED file
- if requested:
- requested_path = os.path.join(dest_info_dir, 'REQUESTED')
- with open(requested_path, "w"):
- pass
- generated.append(requested_path)
- record_text = distribution.get_metadata('RECORD')
- record_rows = list(csv.reader(record_text.splitlines()))
- rows = get_csv_rows_for_installed(
- record_rows,
- installed=installed,
- changed=changed,
- generated=generated,
- lib_dir=lib_dir)
- # Record details of all files installed
- record_path = os.path.join(dest_info_dir, 'RECORD')
- with _generate_file(record_path, **csv_io_kwargs('w')) as record_file:
- # The type mypy infers for record_file is different for Python 3
- # (typing.IO[Any]) and Python 2 (typing.BinaryIO). We explicitly
- # cast to typing.IO[str] as a workaround.
- writer = csv.writer(cast('IO[str]', record_file))
- writer.writerows(_normalized_outrows(rows))
- @contextlib.contextmanager
- def req_error_context(req_description):
- # type: (str) -> Iterator[None]
- try:
- yield
- except InstallationError as e:
- message = "For req: {}. {}".format(req_description, e.args[0])
- reraise(
- InstallationError, InstallationError(message), sys.exc_info()[2]
- )
- def install_wheel(
- name, # type: str
- wheel_path, # type: str
- scheme, # type: Scheme
- req_description, # type: str
- pycompile=True, # type: bool
- warn_script_location=True, # type: bool
- direct_url=None, # type: Optional[DirectUrl]
- requested=False, # type: bool
- ):
- # type: (...) -> None
- with ZipFile(wheel_path, allowZip64=True) as z:
- with req_error_context(req_description):
- _install_wheel(
- name=name,
- wheel_zip=z,
- wheel_path=wheel_path,
- scheme=scheme,
- pycompile=pycompile,
- warn_script_location=warn_script_location,
- direct_url=direct_url,
- requested=requested,
- )
|