123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- # The following comment should be removed at some point in the future.
- # mypy: disallow-untyped-defs=False
- from __future__ import absolute_import
- import datetime
- import hashlib
- import json
- import logging
- import os.path
- import sys
- from pip._vendor import pkg_resources
- from pip._vendor.packaging import version as packaging_version
- from pip._vendor.six import ensure_binary
- from pip._internal.index.collector import LinkCollector
- from pip._internal.index.package_finder import PackageFinder
- from pip._internal.models.search_scope import SearchScope
- from pip._internal.models.selection_prefs import SelectionPreferences
- from pip._internal.utils.filesystem import (
- adjacent_tmp_file,
- check_path_owner,
- replace,
- )
- from pip._internal.utils.misc import (
- ensure_dir,
- get_installed_version,
- redact_auth_from_url,
- )
- from pip._internal.utils.packaging import get_installer
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- if MYPY_CHECK_RUNNING:
- import optparse
- from optparse import Values
- from typing import Any, Dict, Text, Union
- from pip._internal.network.session import PipSession
- SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
- logger = logging.getLogger(__name__)
- def make_link_collector(
- session, # type: PipSession
- options, # type: Values
- suppress_no_index=False, # type: bool
- ):
- # type: (...) -> LinkCollector
- """
- :param session: The Session to use to make requests.
- :param suppress_no_index: Whether to ignore the --no-index option
- when constructing the SearchScope object.
- """
- index_urls = [options.index_url] + options.extra_index_urls
- if options.no_index and not suppress_no_index:
- logger.debug(
- 'Ignoring indexes: %s',
- ','.join(redact_auth_from_url(url) for url in index_urls),
- )
- index_urls = []
- # Make sure find_links is a list before passing to create().
- find_links = options.find_links or []
- search_scope = SearchScope.create(
- find_links=find_links, index_urls=index_urls,
- )
- link_collector = LinkCollector(session=session, search_scope=search_scope)
- return link_collector
- def _get_statefile_name(key):
- # type: (Union[str, Text]) -> str
- key_bytes = ensure_binary(key)
- name = hashlib.sha224(key_bytes).hexdigest()
- return name
- class SelfCheckState(object):
- def __init__(self, cache_dir):
- # type: (str) -> None
- self.state = {} # type: Dict[str, Any]
- self.statefile_path = None
- # Try to load the existing state
- if cache_dir:
- self.statefile_path = os.path.join(
- cache_dir, "selfcheck", _get_statefile_name(self.key)
- )
- try:
- with open(self.statefile_path) as statefile:
- self.state = json.load(statefile)
- except (IOError, ValueError, KeyError):
- # Explicitly suppressing exceptions, since we don't want to
- # error out if the cache file is invalid.
- pass
- @property
- def key(self):
- return sys.prefix
- def save(self, pypi_version, current_time):
- # type: (str, datetime.datetime) -> None
- # If we do not have a path to cache in, don't bother saving.
- if not self.statefile_path:
- return
- # Check to make sure that we own the directory
- if not check_path_owner(os.path.dirname(self.statefile_path)):
- return
- # Now that we've ensured the directory is owned by this user, we'll go
- # ahead and make sure that all our directories are created.
- ensure_dir(os.path.dirname(self.statefile_path))
- state = {
- # Include the key so it's easy to tell which pip wrote the
- # file.
- "key": self.key,
- "last_check": current_time.strftime(SELFCHECK_DATE_FMT),
- "pypi_version": pypi_version,
- }
- text = json.dumps(state, sort_keys=True, separators=(",", ":"))
- with adjacent_tmp_file(self.statefile_path) as f:
- f.write(ensure_binary(text))
- try:
- # Since we have a prefix-specific state file, we can just
- # overwrite whatever is there, no need to check.
- replace(f.name, self.statefile_path)
- except OSError:
- # Best effort.
- pass
- def was_installed_by_pip(pkg):
- # type: (str) -> bool
- """Checks whether pkg was installed by pip
- This is used not to display the upgrade message when pip is in fact
- installed by system package manager, such as dnf on Fedora.
- """
- try:
- dist = pkg_resources.get_distribution(pkg)
- return "pip" == get_installer(dist)
- except pkg_resources.DistributionNotFound:
- return False
- def pip_self_version_check(session, options):
- # type: (PipSession, optparse.Values) -> None
- """Check for an update for pip.
- Limit the frequency of checks to once per week. State is stored either in
- the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
- of the pip script path.
- """
- installed_version = get_installed_version("pip")
- if not installed_version:
- return
- pip_version = packaging_version.parse(installed_version)
- pypi_version = None
- try:
- state = SelfCheckState(cache_dir=options.cache_dir)
- current_time = datetime.datetime.utcnow()
- # Determine if we need to refresh the state
- if "last_check" in state.state and "pypi_version" in state.state:
- last_check = datetime.datetime.strptime(
- state.state["last_check"],
- SELFCHECK_DATE_FMT
- )
- if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60:
- pypi_version = state.state["pypi_version"]
- # Refresh the version if we need to or just see if we need to warn
- if pypi_version is None:
- # Lets use PackageFinder to see what the latest pip version is
- link_collector = make_link_collector(
- session,
- options=options,
- suppress_no_index=True,
- )
- # Pass allow_yanked=False so we don't suggest upgrading to a
- # yanked version.
- selection_prefs = SelectionPreferences(
- allow_yanked=False,
- allow_all_prereleases=False, # Explicitly set to False
- )
- finder = PackageFinder.create(
- link_collector=link_collector,
- selection_prefs=selection_prefs,
- )
- best_candidate = finder.find_best_candidate("pip").best_candidate
- if best_candidate is None:
- return
- pypi_version = str(best_candidate.version)
- # save that we've performed a check
- state.save(pypi_version, current_time)
- remote_version = packaging_version.parse(pypi_version)
- local_version_is_older = (
- pip_version < remote_version and
- pip_version.base_version != remote_version.base_version and
- was_installed_by_pip('pip')
- )
- # Determine if our pypi_version is older
- if not local_version_is_older:
- return
- # We cannot tell how the current pip is available in the current
- # command context, so be pragmatic here and suggest the command
- # that's always available. This does not accommodate spaces in
- # `sys.executable`.
- pip_cmd = "{} -m pip".format(sys.executable)
- logger.warning(
- "You are using pip version %s; however, version %s is "
- "available.\nYou should consider upgrading via the "
- "'%s install --upgrade pip' command.",
- pip_version, pypi_version, pip_cmd
- )
- except Exception:
- logger.debug(
- "There was an error checking the latest version of pip",
- exc_info=True,
- )
|