req_set.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. from __future__ import absolute_import
  2. import logging
  3. from collections import OrderedDict
  4. from pip._vendor.packaging.utils import canonicalize_name
  5. from pip._internal.exceptions import InstallationError
  6. from pip._internal.models.wheel import Wheel
  7. from pip._internal.utils import compatibility_tags
  8. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  9. if MYPY_CHECK_RUNNING:
  10. from typing import Dict, Iterable, List, Optional, Tuple
  11. from pip._internal.req.req_install import InstallRequirement
  12. logger = logging.getLogger(__name__)
  13. class RequirementSet(object):
  14. def __init__(self, check_supported_wheels=True):
  15. # type: (bool) -> None
  16. """Create a RequirementSet.
  17. """
  18. self.requirements = OrderedDict() # type: Dict[str, InstallRequirement] # noqa: E501
  19. self.check_supported_wheels = check_supported_wheels
  20. self.unnamed_requirements = [] # type: List[InstallRequirement]
  21. def __str__(self):
  22. # type: () -> str
  23. requirements = sorted(
  24. (req for req in self.requirements.values() if not req.comes_from),
  25. key=lambda req: canonicalize_name(req.name),
  26. )
  27. return ' '.join(str(req.req) for req in requirements)
  28. def __repr__(self):
  29. # type: () -> str
  30. requirements = sorted(
  31. self.requirements.values(),
  32. key=lambda req: canonicalize_name(req.name),
  33. )
  34. format_string = '<{classname} object; {count} requirement(s): {reqs}>'
  35. return format_string.format(
  36. classname=self.__class__.__name__,
  37. count=len(requirements),
  38. reqs=', '.join(str(req.req) for req in requirements),
  39. )
  40. def add_unnamed_requirement(self, install_req):
  41. # type: (InstallRequirement) -> None
  42. assert not install_req.name
  43. self.unnamed_requirements.append(install_req)
  44. def add_named_requirement(self, install_req):
  45. # type: (InstallRequirement) -> None
  46. assert install_req.name
  47. project_name = canonicalize_name(install_req.name)
  48. self.requirements[project_name] = install_req
  49. def add_requirement(
  50. self,
  51. install_req, # type: InstallRequirement
  52. parent_req_name=None, # type: Optional[str]
  53. extras_requested=None # type: Optional[Iterable[str]]
  54. ):
  55. # type: (...) -> Tuple[List[InstallRequirement], Optional[InstallRequirement]] # noqa: E501
  56. """Add install_req as a requirement to install.
  57. :param parent_req_name: The name of the requirement that needed this
  58. added. The name is used because when multiple unnamed requirements
  59. resolve to the same name, we could otherwise end up with dependency
  60. links that point outside the Requirements set. parent_req must
  61. already be added. Note that None implies that this is a user
  62. supplied requirement, vs an inferred one.
  63. :param extras_requested: an iterable of extras used to evaluate the
  64. environment markers.
  65. :return: Additional requirements to scan. That is either [] if
  66. the requirement is not applicable, or [install_req] if the
  67. requirement is applicable and has just been added.
  68. """
  69. # If the markers do not match, ignore this requirement.
  70. if not install_req.match_markers(extras_requested):
  71. logger.info(
  72. "Ignoring %s: markers '%s' don't match your environment",
  73. install_req.name, install_req.markers,
  74. )
  75. return [], None
  76. # If the wheel is not supported, raise an error.
  77. # Should check this after filtering out based on environment markers to
  78. # allow specifying different wheels based on the environment/OS, in a
  79. # single requirements file.
  80. if install_req.link and install_req.link.is_wheel:
  81. wheel = Wheel(install_req.link.filename)
  82. tags = compatibility_tags.get_supported()
  83. if (self.check_supported_wheels and not wheel.supported(tags)):
  84. raise InstallationError(
  85. "{} is not a supported wheel on this platform.".format(
  86. wheel.filename)
  87. )
  88. # This next bit is really a sanity check.
  89. assert not install_req.user_supplied or parent_req_name is None, (
  90. "a user supplied req shouldn't have a parent"
  91. )
  92. # Unnamed requirements are scanned again and the requirement won't be
  93. # added as a dependency until after scanning.
  94. if not install_req.name:
  95. self.add_unnamed_requirement(install_req)
  96. return [install_req], None
  97. try:
  98. existing_req = self.get_requirement(
  99. install_req.name) # type: Optional[InstallRequirement]
  100. except KeyError:
  101. existing_req = None
  102. has_conflicting_requirement = (
  103. parent_req_name is None and
  104. existing_req and
  105. not existing_req.constraint and
  106. existing_req.extras == install_req.extras and
  107. existing_req.req.specifier != install_req.req.specifier
  108. )
  109. if has_conflicting_requirement:
  110. raise InstallationError(
  111. "Double requirement given: {} (already in {}, name={!r})"
  112. .format(install_req, existing_req, install_req.name)
  113. )
  114. # When no existing requirement exists, add the requirement as a
  115. # dependency and it will be scanned again after.
  116. if not existing_req:
  117. self.add_named_requirement(install_req)
  118. # We'd want to rescan this requirement later
  119. return [install_req], install_req
  120. # Assume there's no need to scan, and that we've already
  121. # encountered this for scanning.
  122. if install_req.constraint or not existing_req.constraint:
  123. return [], existing_req
  124. does_not_satisfy_constraint = (
  125. install_req.link and
  126. not (
  127. existing_req.link and
  128. install_req.link.path == existing_req.link.path
  129. )
  130. )
  131. if does_not_satisfy_constraint:
  132. raise InstallationError(
  133. "Could not satisfy constraints for '{}': "
  134. "installation from path or url cannot be "
  135. "constrained to a version".format(install_req.name)
  136. )
  137. # If we're now installing a constraint, mark the existing
  138. # object for real installation.
  139. existing_req.constraint = False
  140. # If we're now installing a user supplied requirement,
  141. # mark the existing object as such.
  142. if install_req.user_supplied:
  143. existing_req.user_supplied = True
  144. existing_req.extras = tuple(sorted(
  145. set(existing_req.extras) | set(install_req.extras)
  146. ))
  147. logger.debug(
  148. "Setting %s extras to: %s",
  149. existing_req, existing_req.extras,
  150. )
  151. # Return the existing requirement for addition to the parent and
  152. # scanning again.
  153. return [existing_req], existing_req
  154. def has_requirement(self, name):
  155. # type: (str) -> bool
  156. project_name = canonicalize_name(name)
  157. return (
  158. project_name in self.requirements and
  159. not self.requirements[project_name].constraint
  160. )
  161. def get_requirement(self, name):
  162. # type: (str) -> InstallRequirement
  163. project_name = canonicalize_name(name)
  164. if project_name in self.requirements:
  165. return self.requirements[project_name]
  166. raise KeyError("No project with the name {name!r}".format(**locals()))
  167. @property
  168. def all_requirements(self):
  169. # type: () -> List[InstallRequirement]
  170. return self.unnamed_requirements + list(self.requirements.values())