| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 | """PipSession and supporting code, containing all pip-specificnetwork request configuration and behavior."""# The following comment should be removed at some point in the future.# mypy: disallow-untyped-defs=Falseimport email.utilsimport jsonimport loggingimport mimetypesimport osimport platformimport sysimport warningsfrom pip._vendor import requests, six, urllib3from pip._vendor.cachecontrol import CacheControlAdapterfrom pip._vendor.requests.adapters import BaseAdapter, HTTPAdapterfrom pip._vendor.requests.models import Responsefrom pip._vendor.requests.structures import CaseInsensitiveDictfrom pip._vendor.six.moves.urllib import parse as urllib_parsefrom pip._vendor.urllib3.exceptions import InsecureRequestWarningfrom pip import __version__from pip._internal.network.auth import MultiDomainBasicAuthfrom pip._internal.network.cache import SafeFileCache# Import ssl from compat so the initial import occurs in only one place.from pip._internal.utils.compat import has_tls, ipaddressfrom pip._internal.utils.glibc import libc_verfrom pip._internal.utils.misc import (    build_url_from_netloc,    get_installed_version,    parse_netloc,)from pip._internal.utils.typing import MYPY_CHECK_RUNNINGfrom pip._internal.utils.urls import url_to_pathif MYPY_CHECK_RUNNING:    from typing import (        Iterator, List, Optional, Tuple, Union,    )    from pip._internal.models.link import Link    SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]logger = logging.getLogger(__name__)# Ignore warning raised when using --trusted-host.warnings.filterwarnings("ignore", category=InsecureRequestWarning)SECURE_ORIGINS = [    # protocol, hostname, port    # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)    ("https", "*", "*"),    ("*", "localhost", "*"),    ("*", "127.0.0.0/8", "*"),    ("*", "::1/128", "*"),    ("file", "*", None),    # ssh is always secure.    ("ssh", "*", "*"),]  # type: List[SecureOrigin]# These are environment variables present when running under various# CI systems.  For each variable, some CI systems that use the variable# are indicated.  The collection was chosen so that for each of a number# of popular systems, at least one of the environment variables is used.# This list is used to provide some indication of and lower bound for# CI traffic to PyPI.  Thus, it is okay if the list is not comprehensive.# For more background, see: https://github.com/pypa/pip/issues/5499CI_ENVIRONMENT_VARIABLES = (    # Azure Pipelines    'BUILD_BUILDID',    # Jenkins    'BUILD_ID',    # AppVeyor, CircleCI, Codeship, Gitlab CI, Shippable, Travis CI    'CI',    # Explicit environment variable.    'PIP_IS_CI',)def looks_like_ci():    # type: () -> bool    """    Return whether it looks like pip is running under CI.    """    # We don't use the method of checking for a tty (e.g. using isatty())    # because some CI systems mimic a tty (e.g. Travis CI).  Thus that    # method doesn't provide definitive information in either direction.    return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES)def user_agent():    """    Return a string representing the user agent.    """    data = {        "installer": {"name": "pip", "version": __version__},        "python": platform.python_version(),        "implementation": {            "name": platform.python_implementation(),        },    }    if data["implementation"]["name"] == 'CPython':        data["implementation"]["version"] = platform.python_version()    elif data["implementation"]["name"] == 'PyPy':        if sys.pypy_version_info.releaselevel == 'final':            pypy_version_info = sys.pypy_version_info[:3]        else:            pypy_version_info = sys.pypy_version_info        data["implementation"]["version"] = ".".join(            [str(x) for x in pypy_version_info]        )    elif data["implementation"]["name"] == 'Jython':        # Complete Guess        data["implementation"]["version"] = platform.python_version()    elif data["implementation"]["name"] == 'IronPython':        # Complete Guess        data["implementation"]["version"] = platform.python_version()    if sys.platform.startswith("linux"):        from pip._vendor import distro        distro_infos = dict(filter(            lambda x: x[1],            zip(["name", "version", "id"], distro.linux_distribution()),        ))        libc = dict(filter(            lambda x: x[1],            zip(["lib", "version"], libc_ver()),        ))        if libc:            distro_infos["libc"] = libc        if distro_infos:            data["distro"] = distro_infos    if sys.platform.startswith("darwin") and platform.mac_ver()[0]:        data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]}    if platform.system():        data.setdefault("system", {})["name"] = platform.system()    if platform.release():        data.setdefault("system", {})["release"] = platform.release()    if platform.machine():        data["cpu"] = platform.machine()    if has_tls():        import _ssl as ssl        data["openssl_version"] = ssl.OPENSSL_VERSION    setuptools_version = get_installed_version("setuptools")    if setuptools_version is not None:        data["setuptools_version"] = setuptools_version    # Use None rather than False so as not to give the impression that    # pip knows it is not being run under CI.  Rather, it is a null or    # inconclusive result.  Also, we include some value rather than no    # value to make it easier to know that the check has been run.    data["ci"] = True if looks_like_ci() else None    user_data = os.environ.get("PIP_USER_AGENT_USER_DATA")    if user_data is not None:        data["user_data"] = user_data    return "{data[installer][name]}/{data[installer][version]} {json}".format(        data=data,        json=json.dumps(data, separators=(",", ":"), sort_keys=True),    )class LocalFSAdapter(BaseAdapter):    def send(self, request, stream=None, timeout=None, verify=None, cert=None,             proxies=None):        pathname = url_to_path(request.url)        resp = Response()        resp.status_code = 200        resp.url = request.url        try:            stats = os.stat(pathname)        except OSError as exc:            resp.status_code = 404            resp.raw = exc        else:            modified = email.utils.formatdate(stats.st_mtime, usegmt=True)            content_type = mimetypes.guess_type(pathname)[0] or "text/plain"            resp.headers = CaseInsensitiveDict({                "Content-Type": content_type,                "Content-Length": stats.st_size,                "Last-Modified": modified,            })            resp.raw = open(pathname, "rb")            resp.close = resp.raw.close        return resp    def close(self):        passclass InsecureHTTPAdapter(HTTPAdapter):    def cert_verify(self, conn, url, verify, cert):        super(InsecureHTTPAdapter, self).cert_verify(            conn=conn, url=url, verify=False, cert=cert        )class InsecureCacheControlAdapter(CacheControlAdapter):    def cert_verify(self, conn, url, verify, cert):        super(InsecureCacheControlAdapter, self).cert_verify(            conn=conn, url=url, verify=False, cert=cert        )class PipSession(requests.Session):    timeout = None  # type: Optional[int]    def __init__(self, *args, **kwargs):        """        :param trusted_hosts: Domains not to emit warnings for when not using            HTTPS.        """        retries = kwargs.pop("retries", 0)        cache = kwargs.pop("cache", None)        trusted_hosts = kwargs.pop("trusted_hosts", [])  # type: List[str]        index_urls = kwargs.pop("index_urls", None)        super(PipSession, self).__init__(*args, **kwargs)        # Namespace the attribute with "pip_" just in case to prevent        # possible conflicts with the base class.        self.pip_trusted_origins = []  # type: List[Tuple[str, Optional[int]]]        # Attach our User Agent to the request        self.headers["User-Agent"] = user_agent()        # Attach our Authentication handler to the session        self.auth = MultiDomainBasicAuth(index_urls=index_urls)        # Create our urllib3.Retry instance which will allow us to customize        # how we handle retries.        retries = urllib3.Retry(            # Set the total number of retries that a particular request can            # have.            total=retries,            # A 503 error from PyPI typically means that the Fastly -> Origin            # connection got interrupted in some way. A 503 error in general            # is typically considered a transient error so we'll go ahead and            # retry it.            # A 500 may indicate transient error in Amazon S3            # A 520 or 527 - may indicate transient error in CloudFlare            status_forcelist=[500, 503, 520, 527],            # Add a small amount of back off between failed requests in            # order to prevent hammering the service.            backoff_factor=0.25,        )        # Our Insecure HTTPAdapter disables HTTPS validation. It does not        # support caching so we'll use it for all http:// URLs.        # If caching is disabled, we will also use it for        # https:// hosts that we've marked as ignoring        # TLS errors for (trusted-hosts).        insecure_adapter = InsecureHTTPAdapter(max_retries=retries)        # We want to _only_ cache responses on securely fetched origins or when        # the host is specified as trusted. We do this because        # we can't validate the response of an insecurely/untrusted fetched        # origin, and we don't want someone to be able to poison the cache and        # require manual eviction from the cache to fix it.        if cache:            secure_adapter = CacheControlAdapter(                cache=SafeFileCache(cache),                max_retries=retries,            )            self._trusted_host_adapter = InsecureCacheControlAdapter(                cache=SafeFileCache(cache),                max_retries=retries,            )        else:            secure_adapter = HTTPAdapter(max_retries=retries)            self._trusted_host_adapter = insecure_adapter        self.mount("https://", secure_adapter)        self.mount("http://", insecure_adapter)        # Enable file:// urls        self.mount("file://", LocalFSAdapter())        for host in trusted_hosts:            self.add_trusted_host(host, suppress_logging=True)    def add_trusted_host(self, host, source=None, suppress_logging=False):        # type: (str, Optional[str], bool) -> None        """        :param host: It is okay to provide a host that has previously been            added.        :param source: An optional source string, for logging where the host            string came from.        """        if not suppress_logging:            msg = 'adding trusted host: {!r}'.format(host)            if source is not None:                msg += ' (from {})'.format(source)            logger.info(msg)        host_port = parse_netloc(host)        if host_port not in self.pip_trusted_origins:            self.pip_trusted_origins.append(host_port)        self.mount(            build_url_from_netloc(host) + '/',            self._trusted_host_adapter        )        if not host_port[1]:            # Mount wildcard ports for the same host.            self.mount(                build_url_from_netloc(host) + ':',                self._trusted_host_adapter            )    def iter_secure_origins(self):        # type: () -> Iterator[SecureOrigin]        for secure_origin in SECURE_ORIGINS:            yield secure_origin        for host, port in self.pip_trusted_origins:            yield ('*', host, '*' if port is None else port)    def is_secure_origin(self, location):        # type: (Link) -> bool        # Determine if this url used a secure transport mechanism        parsed = urllib_parse.urlparse(str(location))        origin_protocol, origin_host, origin_port = (            parsed.scheme, parsed.hostname, parsed.port,        )        # The protocol to use to see if the protocol matches.        # Don't count the repository type as part of the protocol: in        # cases such as "git+ssh", only use "ssh". (I.e., Only verify against        # the last scheme.)        origin_protocol = origin_protocol.rsplit('+', 1)[-1]        # Determine if our origin is a secure origin by looking through our        # hardcoded list of secure origins, as well as any additional ones        # configured on this PackageFinder instance.        for secure_origin in self.iter_secure_origins():            secure_protocol, secure_host, secure_port = secure_origin            if origin_protocol != secure_protocol and secure_protocol != "*":                continue            try:                addr = ipaddress.ip_address(                    None                    if origin_host is None                    else six.ensure_text(origin_host)                )                network = ipaddress.ip_network(                    six.ensure_text(secure_host)                )            except ValueError:                # We don't have both a valid address or a valid network, so                # we'll check this origin against hostnames.                if (                    origin_host and                    origin_host.lower() != secure_host.lower() and                    secure_host != "*"                ):                    continue            else:                # We have a valid address and network, so see if the address                # is contained within the network.                if addr not in network:                    continue            # Check to see if the port matches.            if (                origin_port != secure_port and                secure_port != "*" and                secure_port is not None            ):                continue            # If we've gotten here, then this origin matches the current            # secure origin and we should return True            return True        # If we've gotten to this point, then the origin isn't secure and we        # will not accept it as a valid location to search. We will however        # log a warning that we are ignoring it.        logger.warning(            "The repository located at %s is not a trusted or secure host and "            "is being ignored. If this repository is available via HTTPS we "            "recommend you use HTTPS instead, otherwise you may silence "            "this warning and allow it anyway with '--trusted-host %s'.",            origin_host,            origin_host,        )        return False    def request(self, method, url, *args, **kwargs):        # Allow setting a default timeout on a session        kwargs.setdefault("timeout", self.timeout)        # Dispatch the actual request        return super(PipSession, self).request(method, url, *args, **kwargs)
 |