123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- """Utilities related archives.
- """
- # The following comment should be removed at some point in the future.
- # mypy: strict-optional=False
- # mypy: disallow-untyped-defs=False
- from __future__ import absolute_import
- import logging
- import os
- import shutil
- import stat
- import tarfile
- import zipfile
- from pip._internal.exceptions import InstallationError
- from pip._internal.utils.filetypes import (
- BZ2_EXTENSIONS,
- TAR_EXTENSIONS,
- XZ_EXTENSIONS,
- ZIP_EXTENSIONS,
- )
- from pip._internal.utils.misc import ensure_dir
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- if MYPY_CHECK_RUNNING:
- from typing import Iterable, List, Optional, Text, Union
- logger = logging.getLogger(__name__)
- SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
- try:
- import bz2 # noqa
- SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
- except ImportError:
- logger.debug('bz2 module is not available')
- try:
- # Only for Python 3.3+
- import lzma # noqa
- SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
- except ImportError:
- logger.debug('lzma module is not available')
- def current_umask():
- """Get the current umask which involves having to set it temporarily."""
- mask = os.umask(0)
- os.umask(mask)
- return mask
- def split_leading_dir(path):
- # type: (Union[str, Text]) -> List[Union[str, Text]]
- path = path.lstrip('/').lstrip('\\')
- if (
- '/' in path and (
- ('\\' in path and path.find('/') < path.find('\\')) or
- '\\' not in path
- )
- ):
- return path.split('/', 1)
- elif '\\' in path:
- return path.split('\\', 1)
- else:
- return [path, '']
- def has_leading_dir(paths):
- # type: (Iterable[Union[str, Text]]) -> bool
- """Returns true if all the paths have the same leading path name
- (i.e., everything is in one subdirectory in an archive)"""
- common_prefix = None
- for path in paths:
- prefix, rest = split_leading_dir(path)
- if not prefix:
- return False
- elif common_prefix is None:
- common_prefix = prefix
- elif prefix != common_prefix:
- return False
- return True
- def is_within_directory(directory, target):
- # type: ((Union[str, Text]), (Union[str, Text])) -> bool
- """
- Return true if the absolute path of target is within the directory
- """
- abs_directory = os.path.abspath(directory)
- abs_target = os.path.abspath(target)
- prefix = os.path.commonprefix([abs_directory, abs_target])
- return prefix == abs_directory
- def unzip_file(filename, location, flatten=True):
- # type: (str, str, bool) -> None
- """
- Unzip the file (with path `filename`) to the destination `location`. All
- files are written based on system defaults and umask (i.e. permissions are
- not preserved), except that regular file members with any execute
- permissions (user, group, or world) have "chmod +x" applied after being
- written. Note that for windows, any execute changes using os.chmod are
- no-ops per the python docs.
- """
- ensure_dir(location)
- zipfp = open(filename, 'rb')
- try:
- zip = zipfile.ZipFile(zipfp, allowZip64=True)
- leading = has_leading_dir(zip.namelist()) and flatten
- for info in zip.infolist():
- name = info.filename
- fn = name
- if leading:
- fn = split_leading_dir(name)[1]
- fn = os.path.join(location, fn)
- dir = os.path.dirname(fn)
- if not is_within_directory(location, fn):
- message = (
- 'The zip file ({}) has a file ({}) trying to install '
- 'outside target directory ({})'
- )
- raise InstallationError(message.format(filename, fn, location))
- if fn.endswith('/') or fn.endswith('\\'):
- # A directory
- ensure_dir(fn)
- else:
- ensure_dir(dir)
- # Don't use read() to avoid allocating an arbitrarily large
- # chunk of memory for the file's content
- fp = zip.open(name)
- try:
- with open(fn, 'wb') as destfp:
- shutil.copyfileobj(fp, destfp)
- finally:
- fp.close()
- mode = info.external_attr >> 16
- # if mode and regular file and any execute permissions for
- # user/group/world?
- if mode and stat.S_ISREG(mode) and mode & 0o111:
- # make dest file have execute for user/group/world
- # (chmod +x) no-op on windows per python docs
- os.chmod(fn, (0o777 - current_umask() | 0o111))
- finally:
- zipfp.close()
- def untar_file(filename, location):
- # type: (str, str) -> None
- """
- Untar the file (with path `filename`) to the destination `location`.
- All files are written based on system defaults and umask (i.e. permissions
- are not preserved), except that regular file members with any execute
- permissions (user, group, or world) have "chmod +x" applied after being
- written. Note that for windows, any execute changes using os.chmod are
- no-ops per the python docs.
- """
- ensure_dir(location)
- if filename.lower().endswith('.gz') or filename.lower().endswith('.tgz'):
- mode = 'r:gz'
- elif filename.lower().endswith(BZ2_EXTENSIONS):
- mode = 'r:bz2'
- elif filename.lower().endswith(XZ_EXTENSIONS):
- mode = 'r:xz'
- elif filename.lower().endswith('.tar'):
- mode = 'r'
- else:
- logger.warning(
- 'Cannot determine compression type for file %s', filename,
- )
- mode = 'r:*'
- tar = tarfile.open(filename, mode)
- try:
- leading = has_leading_dir([
- member.name for member in tar.getmembers()
- ])
- for member in tar.getmembers():
- fn = member.name
- if leading:
- # https://github.com/python/mypy/issues/1174
- fn = split_leading_dir(fn)[1] # type: ignore
- path = os.path.join(location, fn)
- if not is_within_directory(location, path):
- message = (
- 'The tar file ({}) has a file ({}) trying to install '
- 'outside target directory ({})'
- )
- raise InstallationError(
- message.format(filename, path, location)
- )
- if member.isdir():
- ensure_dir(path)
- elif member.issym():
- try:
- # https://github.com/python/typeshed/issues/2673
- tar._extract_member(member, path) # type: ignore
- except Exception as exc:
- # Some corrupt tar files seem to produce this
- # (specifically bad symlinks)
- logger.warning(
- 'In the tar file %s the member %s is invalid: %s',
- filename, member.name, exc,
- )
- continue
- else:
- try:
- fp = tar.extractfile(member)
- except (KeyError, AttributeError) as exc:
- # Some corrupt tar files seem to produce this
- # (specifically bad symlinks)
- logger.warning(
- 'In the tar file %s the member %s is invalid: %s',
- filename, member.name, exc,
- )
- continue
- ensure_dir(os.path.dirname(path))
- with open(path, 'wb') as destfp:
- shutil.copyfileobj(fp, destfp)
- fp.close()
- # Update the timestamp (useful for cython compiled files)
- # https://github.com/python/typeshed/issues/2673
- tar.utime(member, path) # type: ignore
- # member have any execute permissions for user/group/world?
- if member.mode & 0o111:
- # make dest file have execute for user/group/world
- # no-op on windows per python docs
- os.chmod(path, (0o777 - current_umask() | 0o111))
- finally:
- tar.close()
- def unpack_file(
- filename, # type: str
- location, # type: str
- content_type=None, # type: Optional[str]
- ):
- # type: (...) -> None
- filename = os.path.realpath(filename)
- if (
- content_type == 'application/zip' or
- filename.lower().endswith(ZIP_EXTENSIONS) or
- zipfile.is_zipfile(filename)
- ):
- unzip_file(
- filename,
- location,
- flatten=not filename.endswith('.whl')
- )
- elif (
- content_type == 'application/x-gzip' or
- tarfile.is_tarfile(filename) or
- filename.lower().endswith(
- TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS
- )
- ):
- untar_file(filename, location)
- else:
- # FIXME: handle?
- # FIXME: magic signatures?
- logger.critical(
- 'Cannot unpack file %s (downloaded from %s, content-type: %s); '
- 'cannot detect archive format',
- filename, location, content_type,
- )
- raise InstallationError(
- 'Cannot determine archive format of {}'.format(location)
- )
|