temp_dir.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. from __future__ import absolute_import
  2. import errno
  3. import itertools
  4. import logging
  5. import os.path
  6. import tempfile
  7. from contextlib import contextmanager
  8. from pip._vendor.contextlib2 import ExitStack
  9. from pip._internal.utils.misc import enum, rmtree
  10. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  11. if MYPY_CHECK_RUNNING:
  12. from typing import Any, Dict, Iterator, Optional, TypeVar, Union
  13. _T = TypeVar('_T', bound='TempDirectory')
  14. logger = logging.getLogger(__name__)
  15. # Kinds of temporary directories. Only needed for ones that are
  16. # globally-managed.
  17. tempdir_kinds = enum(
  18. BUILD_ENV="build-env",
  19. EPHEM_WHEEL_CACHE="ephem-wheel-cache",
  20. REQ_BUILD="req-build",
  21. )
  22. _tempdir_manager = None # type: Optional[ExitStack]
  23. @contextmanager
  24. def global_tempdir_manager():
  25. # type: () -> Iterator[None]
  26. global _tempdir_manager
  27. with ExitStack() as stack:
  28. old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
  29. try:
  30. yield
  31. finally:
  32. _tempdir_manager = old_tempdir_manager
  33. class TempDirectoryTypeRegistry(object):
  34. """Manages temp directory behavior
  35. """
  36. def __init__(self):
  37. # type: () -> None
  38. self._should_delete = {} # type: Dict[str, bool]
  39. def set_delete(self, kind, value):
  40. # type: (str, bool) -> None
  41. """Indicate whether a TempDirectory of the given kind should be
  42. auto-deleted.
  43. """
  44. self._should_delete[kind] = value
  45. def get_delete(self, kind):
  46. # type: (str) -> bool
  47. """Get configured auto-delete flag for a given TempDirectory type,
  48. default True.
  49. """
  50. return self._should_delete.get(kind, True)
  51. _tempdir_registry = None # type: Optional[TempDirectoryTypeRegistry]
  52. @contextmanager
  53. def tempdir_registry():
  54. # type: () -> Iterator[TempDirectoryTypeRegistry]
  55. """Provides a scoped global tempdir registry that can be used to dictate
  56. whether directories should be deleted.
  57. """
  58. global _tempdir_registry
  59. old_tempdir_registry = _tempdir_registry
  60. _tempdir_registry = TempDirectoryTypeRegistry()
  61. try:
  62. yield _tempdir_registry
  63. finally:
  64. _tempdir_registry = old_tempdir_registry
  65. class _Default(object):
  66. pass
  67. _default = _Default()
  68. class TempDirectory(object):
  69. """Helper class that owns and cleans up a temporary directory.
  70. This class can be used as a context manager or as an OO representation of a
  71. temporary directory.
  72. Attributes:
  73. path
  74. Location to the created temporary directory
  75. delete
  76. Whether the directory should be deleted when exiting
  77. (when used as a contextmanager)
  78. Methods:
  79. cleanup()
  80. Deletes the temporary directory
  81. When used as a context manager, if the delete attribute is True, on
  82. exiting the context the temporary directory is deleted.
  83. """
  84. def __init__(
  85. self,
  86. path=None, # type: Optional[str]
  87. delete=_default, # type: Union[bool, None, _Default]
  88. kind="temp", # type: str
  89. globally_managed=False, # type: bool
  90. ):
  91. super(TempDirectory, self).__init__()
  92. if delete is _default:
  93. if path is not None:
  94. # If we were given an explicit directory, resolve delete option
  95. # now.
  96. delete = False
  97. else:
  98. # Otherwise, we wait until cleanup and see what
  99. # tempdir_registry says.
  100. delete = None
  101. if path is None:
  102. path = self._create(kind)
  103. self._path = path
  104. self._deleted = False
  105. self.delete = delete
  106. self.kind = kind
  107. if globally_managed:
  108. assert _tempdir_manager is not None
  109. _tempdir_manager.enter_context(self)
  110. @property
  111. def path(self):
  112. # type: () -> str
  113. assert not self._deleted, (
  114. "Attempted to access deleted path: {}".format(self._path)
  115. )
  116. return self._path
  117. def __repr__(self):
  118. # type: () -> str
  119. return "<{} {!r}>".format(self.__class__.__name__, self.path)
  120. def __enter__(self):
  121. # type: (_T) -> _T
  122. return self
  123. def __exit__(self, exc, value, tb):
  124. # type: (Any, Any, Any) -> None
  125. if self.delete is not None:
  126. delete = self.delete
  127. elif _tempdir_registry:
  128. delete = _tempdir_registry.get_delete(self.kind)
  129. else:
  130. delete = True
  131. if delete:
  132. self.cleanup()
  133. def _create(self, kind):
  134. # type: (str) -> str
  135. """Create a temporary directory and store its path in self.path
  136. """
  137. # We realpath here because some systems have their default tmpdir
  138. # symlinked to another directory. This tends to confuse build
  139. # scripts, so we canonicalize the path by traversing potential
  140. # symlinks here.
  141. path = os.path.realpath(
  142. tempfile.mkdtemp(prefix="pip-{}-".format(kind))
  143. )
  144. logger.debug("Created temporary directory: {}".format(path))
  145. return path
  146. def cleanup(self):
  147. # type: () -> None
  148. """Remove the temporary directory created and reset state
  149. """
  150. self._deleted = True
  151. if os.path.exists(self._path):
  152. rmtree(self._path)
  153. class AdjacentTempDirectory(TempDirectory):
  154. """Helper class that creates a temporary directory adjacent to a real one.
  155. Attributes:
  156. original
  157. The original directory to create a temp directory for.
  158. path
  159. After calling create() or entering, contains the full
  160. path to the temporary directory.
  161. delete
  162. Whether the directory should be deleted when exiting
  163. (when used as a contextmanager)
  164. """
  165. # The characters that may be used to name the temp directory
  166. # We always prepend a ~ and then rotate through these until
  167. # a usable name is found.
  168. # pkg_resources raises a different error for .dist-info folder
  169. # with leading '-' and invalid metadata
  170. LEADING_CHARS = "-~.=%0123456789"
  171. def __init__(self, original, delete=None):
  172. # type: (str, Optional[bool]) -> None
  173. self.original = original.rstrip('/\\')
  174. super(AdjacentTempDirectory, self).__init__(delete=delete)
  175. @classmethod
  176. def _generate_names(cls, name):
  177. # type: (str) -> Iterator[str]
  178. """Generates a series of temporary names.
  179. The algorithm replaces the leading characters in the name
  180. with ones that are valid filesystem characters, but are not
  181. valid package names (for both Python and pip definitions of
  182. package).
  183. """
  184. for i in range(1, len(name)):
  185. for candidate in itertools.combinations_with_replacement(
  186. cls.LEADING_CHARS, i - 1):
  187. new_name = '~' + ''.join(candidate) + name[i:]
  188. if new_name != name:
  189. yield new_name
  190. # If we make it this far, we will have to make a longer name
  191. for i in range(len(cls.LEADING_CHARS)):
  192. for candidate in itertools.combinations_with_replacement(
  193. cls.LEADING_CHARS, i):
  194. new_name = '~' + ''.join(candidate) + name
  195. if new_name != name:
  196. yield new_name
  197. def _create(self, kind):
  198. # type: (str) -> str
  199. root, name = os.path.split(self.original)
  200. for candidate in self._generate_names(name):
  201. path = os.path.join(root, candidate)
  202. try:
  203. os.mkdir(path)
  204. except OSError as ex:
  205. # Continue if the name exists already
  206. if ex.errno != errno.EEXIST:
  207. raise
  208. else:
  209. path = os.path.realpath(path)
  210. break
  211. else:
  212. # Final fallback on the default behavior.
  213. path = os.path.realpath(
  214. tempfile.mkdtemp(prefix="pip-{}-".format(kind))
  215. )
  216. logger.debug("Created temporary directory: {}".format(path))
  217. return path