123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- """Support functions for working with wheel files.
- """
- from __future__ import absolute_import
- import logging
- from email.parser import Parser
- from zipfile import ZipFile
- from pip._vendor.packaging.utils import canonicalize_name
- from pip._vendor.pkg_resources import DistInfoDistribution
- from pip._vendor.six import PY2, ensure_str
- from pip._internal.exceptions import UnsupportedWheel
- from pip._internal.utils.pkg_resources import DictMetadata
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- if MYPY_CHECK_RUNNING:
- from email.message import Message
- from typing import Dict, Tuple
- from pip._vendor.pkg_resources import Distribution
- if PY2:
- from zipfile import BadZipfile as BadZipFile
- else:
- from zipfile import BadZipFile
- VERSION_COMPATIBLE = (1, 0)
- logger = logging.getLogger(__name__)
- class WheelMetadata(DictMetadata):
- """Metadata provider that maps metadata decoding exceptions to our
- internal exception type.
- """
- def __init__(self, metadata, wheel_name):
- # type: (Dict[str, bytes], str) -> None
- super(WheelMetadata, self).__init__(metadata)
- self._wheel_name = wheel_name
- def get_metadata(self, name):
- # type: (str) -> str
- try:
- return super(WheelMetadata, self).get_metadata(name)
- except UnicodeDecodeError as e:
- # Augment the default error with the origin of the file.
- raise UnsupportedWheel(
- "Error decoding metadata for {}: {}".format(
- self._wheel_name, e
- )
- )
- def pkg_resources_distribution_for_wheel(wheel_zip, name, location):
- # type: (ZipFile, str, str) -> Distribution
- """Get a pkg_resources distribution given a wheel.
- :raises UnsupportedWheel: on any errors
- """
- info_dir, _ = parse_wheel(wheel_zip, name)
- metadata_files = [
- p for p in wheel_zip.namelist() if p.startswith("{}/".format(info_dir))
- ]
- metadata_text = {} # type: Dict[str, bytes]
- for path in metadata_files:
- # If a flag is set, namelist entries may be unicode in Python 2.
- # We coerce them to native str type to match the types used in the rest
- # of the code. This cannot fail because unicode can always be encoded
- # with UTF-8.
- full_path = ensure_str(path)
- _, metadata_name = full_path.split("/", 1)
- try:
- metadata_text[metadata_name] = read_wheel_metadata_file(
- wheel_zip, full_path
- )
- except UnsupportedWheel as e:
- raise UnsupportedWheel(
- "{} has an invalid wheel, {}".format(name, str(e))
- )
- metadata = WheelMetadata(metadata_text, location)
- return DistInfoDistribution(
- location=location, metadata=metadata, project_name=name
- )
- def parse_wheel(wheel_zip, name):
- # type: (ZipFile, str) -> Tuple[str, Message]
- """Extract information from the provided wheel, ensuring it meets basic
- standards.
- Returns the name of the .dist-info directory and the parsed WHEEL metadata.
- """
- try:
- info_dir = wheel_dist_info_dir(wheel_zip, name)
- metadata = wheel_metadata(wheel_zip, info_dir)
- version = wheel_version(metadata)
- except UnsupportedWheel as e:
- raise UnsupportedWheel(
- "{} has an invalid wheel, {}".format(name, str(e))
- )
- check_compatibility(version, name)
- return info_dir, metadata
- def wheel_dist_info_dir(source, name):
- # type: (ZipFile, str) -> str
- """Returns the name of the contained .dist-info directory.
- Raises AssertionError or UnsupportedWheel if not found, >1 found, or
- it doesn't match the provided name.
- """
- # Zip file path separators must be /
- subdirs = list(set(p.split("/")[0] for p in source.namelist()))
- info_dirs = [s for s in subdirs if s.endswith('.dist-info')]
- if not info_dirs:
- raise UnsupportedWheel(".dist-info directory not found")
- if len(info_dirs) > 1:
- raise UnsupportedWheel(
- "multiple .dist-info directories found: {}".format(
- ", ".join(info_dirs)
- )
- )
- info_dir = info_dirs[0]
- info_dir_name = canonicalize_name(info_dir)
- canonical_name = canonicalize_name(name)
- if not info_dir_name.startswith(canonical_name):
- raise UnsupportedWheel(
- ".dist-info directory {!r} does not start with {!r}".format(
- info_dir, canonical_name
- )
- )
- # Zip file paths can be unicode or str depending on the zip entry flags,
- # so normalize it.
- return ensure_str(info_dir)
- def read_wheel_metadata_file(source, path):
- # type: (ZipFile, str) -> bytes
- try:
- return source.read(path)
- # BadZipFile for general corruption, KeyError for missing entry,
- # and RuntimeError for password-protected files
- except (BadZipFile, KeyError, RuntimeError) as e:
- raise UnsupportedWheel(
- "could not read {!r} file: {!r}".format(path, e)
- )
- def wheel_metadata(source, dist_info_dir):
- # type: (ZipFile, str) -> Message
- """Return the WHEEL metadata of an extracted wheel, if possible.
- Otherwise, raise UnsupportedWheel.
- """
- path = "{}/WHEEL".format(dist_info_dir)
- # Zip file path separators must be /
- wheel_contents = read_wheel_metadata_file(source, path)
- try:
- wheel_text = ensure_str(wheel_contents)
- except UnicodeDecodeError as e:
- raise UnsupportedWheel("error decoding {!r}: {!r}".format(path, e))
- # FeedParser (used by Parser) does not raise any exceptions. The returned
- # message may have .defects populated, but for backwards-compatibility we
- # currently ignore them.
- return Parser().parsestr(wheel_text)
- def wheel_version(wheel_data):
- # type: (Message) -> Tuple[int, ...]
- """Given WHEEL metadata, return the parsed Wheel-Version.
- Otherwise, raise UnsupportedWheel.
- """
- version_text = wheel_data["Wheel-Version"]
- if version_text is None:
- raise UnsupportedWheel("WHEEL is missing Wheel-Version")
- version = version_text.strip()
- try:
- return tuple(map(int, version.split('.')))
- except ValueError:
- raise UnsupportedWheel("invalid Wheel-Version: {!r}".format(version))
- def check_compatibility(version, name):
- # type: (Tuple[int, ...], str) -> None
- """Raises errors or warns if called with an incompatible Wheel-Version.
- pip should refuse to install a Wheel-Version that's a major series
- ahead of what it's compatible with (e.g 2.0 > 1.1); and warn when
- installing a version only minor version ahead (e.g 1.2 > 1.1).
- version: a 2-tuple representing a Wheel-Version (Major, Minor)
- name: name of wheel or package to raise exception about
- :raises UnsupportedWheel: when an incompatible Wheel-Version is given
- """
- if version[0] > VERSION_COMPATIBLE[0]:
- raise UnsupportedWheel(
- "{}'s Wheel-Version ({}) is not compatible with this version "
- "of pip".format(name, '.'.join(map(str, version)))
- )
- elif version > VERSION_COMPATIBLE:
- logger.warning(
- 'Installing from a newer Wheel-Version (%s)',
- '.'.join(map(str, version)),
- )
|