123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- # The following comment should be removed at some point in the future.
- # mypy: strict-optional=False
- from __future__ import absolute_import
- import contextlib
- import errno
- import hashlib
- import logging
- import os
- from pip._vendor import contextlib2
- from pip._internal.utils.temp_dir import TempDirectory
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- if MYPY_CHECK_RUNNING:
- from types import TracebackType
- from typing import Dict, Iterator, Optional, Set, Type, Union
- from pip._internal.req.req_install import InstallRequirement
- from pip._internal.models.link import Link
- logger = logging.getLogger(__name__)
- @contextlib.contextmanager
- def update_env_context_manager(**changes):
- # type: (str) -> Iterator[None]
- target = os.environ
- # Save values from the target and change them.
- non_existent_marker = object()
- saved_values = {} # type: Dict[str, Union[object, str]]
- for name, new_value in changes.items():
- try:
- saved_values[name] = target[name]
- except KeyError:
- saved_values[name] = non_existent_marker
- target[name] = new_value
- try:
- yield
- finally:
- # Restore original values in the target.
- for name, original_value in saved_values.items():
- if original_value is non_existent_marker:
- del target[name]
- else:
- assert isinstance(original_value, str) # for mypy
- target[name] = original_value
- @contextlib.contextmanager
- def get_requirement_tracker():
- # type: () -> Iterator[RequirementTracker]
- root = os.environ.get('PIP_REQ_TRACKER')
- with contextlib2.ExitStack() as ctx:
- if root is None:
- root = ctx.enter_context(
- TempDirectory(kind='req-tracker')
- ).path
- ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
- logger.debug("Initialized build tracking at %s", root)
- with RequirementTracker(root) as tracker:
- yield tracker
- class RequirementTracker(object):
- def __init__(self, root):
- # type: (str) -> None
- self._root = root
- self._entries = set() # type: Set[InstallRequirement]
- logger.debug("Created build tracker: %s", self._root)
- def __enter__(self):
- # type: () -> RequirementTracker
- logger.debug("Entered build tracker: %s", self._root)
- return self
- def __exit__(
- self,
- exc_type, # type: Optional[Type[BaseException]]
- exc_val, # type: Optional[BaseException]
- exc_tb # type: Optional[TracebackType]
- ):
- # type: (...) -> None
- self.cleanup()
- def _entry_path(self, link):
- # type: (Link) -> str
- hashed = hashlib.sha224(link.url_without_fragment.encode()).hexdigest()
- return os.path.join(self._root, hashed)
- def add(self, req):
- # type: (InstallRequirement) -> None
- """Add an InstallRequirement to build tracking.
- """
- # Get the file to write information about this requirement.
- entry_path = self._entry_path(req.link)
- # Try reading from the file. If it exists and can be read from, a build
- # is already in progress, so a LookupError is raised.
- try:
- with open(entry_path) as fp:
- contents = fp.read()
- except IOError as e:
- # if the error is anything other than "file does not exist", raise.
- if e.errno != errno.ENOENT:
- raise
- else:
- message = '{} is already being built: {}'.format(
- req.link, contents)
- raise LookupError(message)
- # If we're here, req should really not be building already.
- assert req not in self._entries
- # Start tracking this requirement.
- with open(entry_path, 'w') as fp:
- fp.write(str(req))
- self._entries.add(req)
- logger.debug('Added %s to build tracker %r', req, self._root)
- def remove(self, req):
- # type: (InstallRequirement) -> None
- """Remove an InstallRequirement from build tracking.
- """
- # Delete the created file and the corresponding entries.
- os.unlink(self._entry_path(req.link))
- self._entries.remove(req)
- logger.debug('Removed %s from build tracker %r', req, self._root)
- def cleanup(self):
- # type: () -> None
- for req in set(self._entries):
- self.remove(req)
- logger.debug("Removed build tracker: %r", self._root)
- @contextlib.contextmanager
- def track(self, req):
- # type: (InstallRequirement) -> Iterator[None]
- self.add(req)
- yield
- self.remove(req)
|