123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import functools
- import logging
- from pip._vendor import six
- from pip._vendor.packaging.utils import canonicalize_name
- from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible
- from pip._vendor.resolvelib import Resolver as RLResolver
- from pip._internal.exceptions import InstallationError
- from pip._internal.req.req_set import RequirementSet
- from pip._internal.resolution.base import BaseResolver
- from pip._internal.resolution.resolvelib.provider import PipProvider
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- from .factory import Factory
- if MYPY_CHECK_RUNNING:
- from typing import Dict, List, Optional, Tuple
- from pip._vendor.resolvelib.resolvers import Result
- from pip._internal.cache import WheelCache
- from pip._internal.index.package_finder import PackageFinder
- from pip._internal.operations.prepare import RequirementPreparer
- from pip._internal.req.req_install import InstallRequirement
- from pip._internal.resolution.base import InstallRequirementProvider
- logger = logging.getLogger(__name__)
- class Resolver(BaseResolver):
- def __init__(
- self,
- preparer, # type: RequirementPreparer
- finder, # type: PackageFinder
- wheel_cache, # type: Optional[WheelCache]
- make_install_req, # type: InstallRequirementProvider
- use_user_site, # type: bool
- ignore_dependencies, # type: bool
- ignore_installed, # type: bool
- ignore_requires_python, # type: bool
- force_reinstall, # type: bool
- upgrade_strategy, # type: str
- py_version_info=None, # type: Optional[Tuple[int, ...]]
- ):
- super(Resolver, self).__init__()
- self.factory = Factory(
- finder=finder,
- preparer=preparer,
- make_install_req=make_install_req,
- force_reinstall=force_reinstall,
- ignore_installed=ignore_installed,
- ignore_requires_python=ignore_requires_python,
- py_version_info=py_version_info,
- )
- self.ignore_dependencies = ignore_dependencies
- self._result = None # type: Optional[Result]
- def resolve(self, root_reqs, check_supported_wheels):
- # type: (List[InstallRequirement], bool) -> RequirementSet
- # FIXME: Implement constraints.
- if any(r.constraint for r in root_reqs):
- raise InstallationError("Constraints are not yet supported.")
- provider = PipProvider(
- factory=self.factory,
- ignore_dependencies=self.ignore_dependencies,
- )
- reporter = BaseReporter()
- resolver = RLResolver(provider, reporter)
- requirements = [
- self.factory.make_requirement_from_install_req(r)
- for r in root_reqs
- ]
- try:
- self._result = resolver.resolve(requirements)
- except ResolutionImpossible as e:
- error = self.factory.get_installation_error(e)
- if not error:
- # TODO: This needs fixing, we need to look at the
- # factory.get_installation_error infrastructure, as that
- # doesn't really allow for the logger.critical calls I'm
- # using here.
- for req, parent in e.causes:
- logger.critical(
- "Could not find a version that satisfies " +
- "the requirement " +
- str(req) +
- ("" if parent is None else " (from {})".format(
- parent.name
- ))
- )
- raise InstallationError(
- "No matching distribution found for " +
- ", ".join([r.name for r, _ in e.causes])
- )
- raise
- six.raise_from(error, e)
- req_set = RequirementSet(check_supported_wheels=check_supported_wheels)
- for candidate in self._result.mapping.values():
- ireq = provider.get_install_requirement(candidate)
- if ireq is None:
- continue
- ireq.should_reinstall = self.factory.should_reinstall(candidate)
- req_set.add_named_requirement(ireq)
- return req_set
- def get_installation_order(self, req_set):
- # type: (RequirementSet) -> List[InstallRequirement]
- """Create a list that orders given requirements for installation.
- The returned list should contain all requirements in ``req_set``,
- so the caller can loop through it and have a requirement installed
- before the requiring thing.
- The current implementation walks the resolved dependency graph, and
- make sure every node has a greater "weight" than all its parents.
- """
- assert self._result is not None, "must call resolve() first"
- weights = {} # type: Dict[Optional[str], int]
- graph = self._result.graph
- key_count = len(self._result.mapping) + 1 # Packages plus sentinal.
- while len(weights) < key_count:
- progressed = False
- for key in graph:
- if key in weights:
- continue
- parents = list(graph.iter_parents(key))
- if not all(p in weights for p in parents):
- continue
- if parents:
- weight = max(weights[p] for p in parents) + 1
- else:
- weight = 0
- weights[key] = weight
- progressed = True
- # FIXME: This check will fail if there are unbreakable cycles.
- # Implement something to forcifully break them up to continue.
- if not progressed:
- raise InstallationError(
- "Could not determine installation order due to cicular "
- "dependency."
- )
- sorted_items = sorted(
- req_set.requirements.items(),
- key=functools.partial(_req_set_item_sorter, weights=weights),
- reverse=True,
- )
- return [ireq for _, ireq in sorted_items]
- def _req_set_item_sorter(
- item, # type: Tuple[str, InstallRequirement]
- weights, # type: Dict[Optional[str], int]
- ):
- # type: (...) -> Tuple[int, str]
- """Key function used to sort install requirements for installation.
- Based on the "weight" mapping calculated in ``get_installation_order()``.
- The canonical package name is returned as the second member as a tie-
- breaker to ensure the result is predictable, which is useful in tests.
- """
- name = canonicalize_name(item[0])
- return weights[name], name
|