req_set.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # The following comment should be removed at some point in the future.
  2. # mypy: strict-optional=False
  3. from __future__ import absolute_import
  4. import logging
  5. from collections import OrderedDict
  6. from pip._vendor.packaging.utils import canonicalize_name
  7. from pip._internal.exceptions import InstallationError
  8. from pip._internal.models.wheel import Wheel
  9. from pip._internal.utils import compatibility_tags
  10. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  11. if MYPY_CHECK_RUNNING:
  12. from typing import Dict, Iterable, List, Optional, Tuple
  13. from pip._internal.req.req_install import InstallRequirement
  14. logger = logging.getLogger(__name__)
  15. class RequirementSet(object):
  16. def __init__(self, check_supported_wheels=True):
  17. # type: (bool) -> None
  18. """Create a RequirementSet.
  19. """
  20. self.requirements = OrderedDict() # type: Dict[str, InstallRequirement] # noqa: E501
  21. self.check_supported_wheels = check_supported_wheels
  22. self.unnamed_requirements = [] # type: List[InstallRequirement]
  23. def __str__(self):
  24. # type: () -> str
  25. requirements = sorted(
  26. (req for req in self.requirements.values() if not req.comes_from),
  27. key=lambda req: canonicalize_name(req.name),
  28. )
  29. return ' '.join(str(req.req) for req in requirements)
  30. def __repr__(self):
  31. # type: () -> str
  32. requirements = sorted(
  33. self.requirements.values(),
  34. key=lambda req: canonicalize_name(req.name),
  35. )
  36. format_string = '<{classname} object; {count} requirement(s): {reqs}>'
  37. return format_string.format(
  38. classname=self.__class__.__name__,
  39. count=len(requirements),
  40. reqs=', '.join(str(req.req) for req in requirements),
  41. )
  42. def add_unnamed_requirement(self, install_req):
  43. # type: (InstallRequirement) -> None
  44. assert not install_req.name
  45. self.unnamed_requirements.append(install_req)
  46. def add_named_requirement(self, install_req):
  47. # type: (InstallRequirement) -> None
  48. assert install_req.name
  49. project_name = canonicalize_name(install_req.name)
  50. self.requirements[project_name] = install_req
  51. def add_requirement(
  52. self,
  53. install_req, # type: InstallRequirement
  54. parent_req_name=None, # type: Optional[str]
  55. extras_requested=None # type: Optional[Iterable[str]]
  56. ):
  57. # type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]] # noqa: E501
  58. """Add install_req as a requirement to install.
  59. :param parent_req_name: The name of the requirement that needed this
  60. added. The name is used because when multiple unnamed requirements
  61. resolve to the same name, we could otherwise end up with dependency
  62. links that point outside the Requirements set. parent_req must
  63. already be added. Note that None implies that this is a user
  64. supplied requirement, vs an inferred one.
  65. :param extras_requested: an iterable of extras used to evaluate the
  66. environment markers.
  67. :return: Additional requirements to scan. That is either [] if
  68. the requirement is not applicable, or [install_req] if the
  69. requirement is applicable and has just been added.
  70. """
  71. # If the markers do not match, ignore this requirement.
  72. if not install_req.match_markers(extras_requested):
  73. logger.info(
  74. "Ignoring %s: markers '%s' don't match your environment",
  75. install_req.name, install_req.markers,
  76. )
  77. return [], None
  78. # If the wheel is not supported, raise an error.
  79. # Should check this after filtering out based on environment markers to
  80. # allow specifying different wheels based on the environment/OS, in a
  81. # single requirements file.
  82. if install_req.link and install_req.link.is_wheel:
  83. wheel = Wheel(install_req.link.filename)
  84. tags = compatibility_tags.get_supported()
  85. if (self.check_supported_wheels and not wheel.supported(tags)):
  86. raise InstallationError(
  87. "{} is not a supported wheel on this platform.".format(
  88. wheel.filename)
  89. )
  90. # This next bit is really a sanity check.
  91. assert install_req.is_direct == (parent_req_name is None), (
  92. "a direct req shouldn't have a parent and also, "
  93. "a non direct req should have a parent"
  94. )
  95. # Unnamed requirements are scanned again and the requirement won't be
  96. # added as a dependency until after scanning.
  97. if not install_req.name:
  98. self.add_unnamed_requirement(install_req)
  99. return [install_req], None
  100. try:
  101. existing_req = self.get_requirement(install_req.name)
  102. except KeyError:
  103. existing_req = None
  104. has_conflicting_requirement = (
  105. parent_req_name is None and
  106. existing_req and
  107. not existing_req.constraint and
  108. existing_req.extras == install_req.extras and
  109. existing_req.req.specifier != install_req.req.specifier
  110. )
  111. if has_conflicting_requirement:
  112. raise InstallationError(
  113. "Double requirement given: {} (already in {}, name={!r})"
  114. .format(install_req, existing_req, install_req.name)
  115. )
  116. # When no existing requirement exists, add the requirement as a
  117. # dependency and it will be scanned again after.
  118. if not existing_req:
  119. self.add_named_requirement(install_req)
  120. # We'd want to rescan this requirement later
  121. return [install_req], install_req
  122. # Assume there's no need to scan, and that we've already
  123. # encountered this for scanning.
  124. if install_req.constraint or not existing_req.constraint:
  125. return [], existing_req
  126. does_not_satisfy_constraint = (
  127. install_req.link and
  128. not (
  129. existing_req.link and
  130. install_req.link.path == existing_req.link.path
  131. )
  132. )
  133. if does_not_satisfy_constraint:
  134. raise InstallationError(
  135. "Could not satisfy constraints for '{}': "
  136. "installation from path or url cannot be "
  137. "constrained to a version".format(install_req.name)
  138. )
  139. # If we're now installing a constraint, mark the existing
  140. # object for real installation.
  141. existing_req.constraint = False
  142. existing_req.extras = tuple(sorted(
  143. set(existing_req.extras) | set(install_req.extras)
  144. ))
  145. logger.debug(
  146. "Setting %s extras to: %s",
  147. existing_req, existing_req.extras,
  148. )
  149. # Return the existing requirement for addition to the parent and
  150. # scanning again.
  151. return [existing_req], existing_req
  152. def has_requirement(self, name):
  153. # type: (str) -> bool
  154. project_name = canonicalize_name(name)
  155. return (
  156. project_name in self.requirements and
  157. not self.requirements[project_name].constraint
  158. )
  159. def get_requirement(self, name):
  160. # type: (str) -> InstallRequirement
  161. project_name = canonicalize_name(name)
  162. if project_name in self.requirements:
  163. return self.requirements[project_name]
  164. raise KeyError("No project with the name {name!r}".format(**locals()))
  165. @property
  166. def all_requirements(self):
  167. # type: () -> List[InstallRequirement]
  168. return self.unnamed_requirements + list(self.requirements.values())