123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661 |
- """
- The main purpose of this module is to expose LinkCollector.collect_links().
- """
- import cgi
- import functools
- import itertools
- import logging
- import mimetypes
- import os
- import re
- from collections import OrderedDict
- from pip._vendor import html5lib, requests
- from pip._vendor.distlib.compat import unescape
- from pip._vendor.requests.exceptions import HTTPError, RetryError, SSLError
- from pip._vendor.six.moves.urllib import parse as urllib_parse
- from pip._vendor.six.moves.urllib import request as urllib_request
- from pip._internal.models.link import Link
- from pip._internal.utils.filetypes import ARCHIVE_EXTENSIONS
- from pip._internal.utils.misc import pairwise, redact_auth_from_url
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- from pip._internal.utils.urls import path_to_url, url_to_path
- from pip._internal.vcs import is_url, vcs
- if MYPY_CHECK_RUNNING:
- from typing import (
- Callable, Iterable, List, MutableMapping, Optional,
- Protocol, Sequence, Tuple, TypeVar, Union,
- )
- import xml.etree.ElementTree
- from pip._vendor.requests import Response
- from pip._internal.models.search_scope import SearchScope
- from pip._internal.network.session import PipSession
- HTMLElement = xml.etree.ElementTree.Element
- ResponseHeaders = MutableMapping[str, str]
- # Used in the @lru_cache polyfill.
- F = TypeVar('F')
- class LruCache(Protocol):
- def __call__(self, maxsize=None):
- # type: (Optional[int]) -> Callable[[F], F]
- raise NotImplementedError
- logger = logging.getLogger(__name__)
- # Fallback to noop_lru_cache in Python 2
- # TODO: this can be removed when python 2 support is dropped!
- def noop_lru_cache(maxsize=None):
- # type: (Optional[int]) -> Callable[[F], F]
- def _wrapper(f):
- # type: (F) -> F
- return f
- return _wrapper
- _lru_cache = getattr(functools, "lru_cache", noop_lru_cache) # type: LruCache
- def _match_vcs_scheme(url):
- # type: (str) -> Optional[str]
- """Look for VCS schemes in the URL.
- Returns the matched VCS scheme, or None if there's no match.
- """
- for scheme in vcs.schemes:
- if url.lower().startswith(scheme) and url[len(scheme)] in '+:':
- return scheme
- return None
- def _is_url_like_archive(url):
- # type: (str) -> bool
- """Return whether the URL looks like an archive.
- """
- filename = Link(url).filename
- for bad_ext in ARCHIVE_EXTENSIONS:
- if filename.endswith(bad_ext):
- return True
- return False
- class _NotHTML(Exception):
- def __init__(self, content_type, request_desc):
- # type: (str, str) -> None
- super(_NotHTML, self).__init__(content_type, request_desc)
- self.content_type = content_type
- self.request_desc = request_desc
- def _ensure_html_header(response):
- # type: (Response) -> None
- """Check the Content-Type header to ensure the response contains HTML.
- Raises `_NotHTML` if the content type is not text/html.
- """
- content_type = response.headers.get("Content-Type", "")
- if not content_type.lower().startswith("text/html"):
- raise _NotHTML(content_type, response.request.method)
- class _NotHTTP(Exception):
- pass
- def _ensure_html_response(url, session):
- # type: (str, PipSession) -> None
- """Send a HEAD request to the URL, and ensure the response contains HTML.
- Raises `_NotHTTP` if the URL is not available for a HEAD request, or
- `_NotHTML` if the content type is not text/html.
- """
- scheme, netloc, path, query, fragment = urllib_parse.urlsplit(url)
- if scheme not in {'http', 'https'}:
- raise _NotHTTP()
- resp = session.head(url, allow_redirects=True)
- resp.raise_for_status()
- _ensure_html_header(resp)
- def _get_html_response(url, session):
- # type: (str, PipSession) -> Response
- """Access an HTML page with GET, and return the response.
- This consists of three parts:
- 1. If the URL looks suspiciously like an archive, send a HEAD first to
- check the Content-Type is HTML, to avoid downloading a large file.
- Raise `_NotHTTP` if the content type cannot be determined, or
- `_NotHTML` if it is not HTML.
- 2. Actually perform the request. Raise HTTP exceptions on network failures.
- 3. Check the Content-Type header to make sure we got HTML, and raise
- `_NotHTML` otherwise.
- """
- if _is_url_like_archive(url):
- _ensure_html_response(url, session=session)
- logger.debug('Getting page %s', redact_auth_from_url(url))
- resp = session.get(
- url,
- headers={
- "Accept": "text/html",
- # We don't want to blindly returned cached data for
- # /simple/, because authors generally expecting that
- # twine upload && pip install will function, but if
- # they've done a pip install in the last ~10 minutes
- # it won't. Thus by setting this to zero we will not
- # blindly use any cached data, however the benefit of
- # using max-age=0 instead of no-cache, is that we will
- # still support conditional requests, so we will still
- # minimize traffic sent in cases where the page hasn't
- # changed at all, we will just always incur the round
- # trip for the conditional GET now instead of only
- # once per 10 minutes.
- # For more information, please see pypa/pip#5670.
- "Cache-Control": "max-age=0",
- },
- )
- resp.raise_for_status()
- # The check for archives above only works if the url ends with
- # something that looks like an archive. However that is not a
- # requirement of an url. Unless we issue a HEAD request on every
- # url we cannot know ahead of time for sure if something is HTML
- # or not. However we can check after we've downloaded it.
- _ensure_html_header(resp)
- return resp
- def _get_encoding_from_headers(headers):
- # type: (ResponseHeaders) -> Optional[str]
- """Determine if we have any encoding information in our headers.
- """
- if headers and "Content-Type" in headers:
- content_type, params = cgi.parse_header(headers["Content-Type"])
- if "charset" in params:
- return params['charset']
- return None
- def _determine_base_url(document, page_url):
- # type: (HTMLElement, str) -> str
- """Determine the HTML document's base URL.
- This looks for a ``<base>`` tag in the HTML document. If present, its href
- attribute denotes the base URL of anchor tags in the document. If there is
- no such tag (or if it does not have a valid href attribute), the HTML
- file's URL is used as the base URL.
- :param document: An HTML document representation. The current
- implementation expects the result of ``html5lib.parse()``.
- :param page_url: The URL of the HTML document.
- """
- for base in document.findall(".//base"):
- href = base.get("href")
- if href is not None:
- return href
- return page_url
- def _clean_url_path_part(part):
- # type: (str) -> str
- """
- Clean a "part" of a URL path (i.e. after splitting on "@" characters).
- """
- # We unquote prior to quoting to make sure nothing is double quoted.
- return urllib_parse.quote(urllib_parse.unquote(part))
- def _clean_file_url_path(part):
- # type: (str) -> str
- """
- Clean the first part of a URL path that corresponds to a local
- filesystem path (i.e. the first part after splitting on "@" characters).
- """
- # We unquote prior to quoting to make sure nothing is double quoted.
- # Also, on Windows the path part might contain a drive letter which
- # should not be quoted. On Linux where drive letters do not
- # exist, the colon should be quoted. We rely on urllib.request
- # to do the right thing here.
- return urllib_request.pathname2url(urllib_request.url2pathname(part))
- # percent-encoded: /
- _reserved_chars_re = re.compile('(@|%2F)', re.IGNORECASE)
- def _clean_url_path(path, is_local_path):
- # type: (str, bool) -> str
- """
- Clean the path portion of a URL.
- """
- if is_local_path:
- clean_func = _clean_file_url_path
- else:
- clean_func = _clean_url_path_part
- # Split on the reserved characters prior to cleaning so that
- # revision strings in VCS URLs are properly preserved.
- parts = _reserved_chars_re.split(path)
- cleaned_parts = []
- for to_clean, reserved in pairwise(itertools.chain(parts, [''])):
- cleaned_parts.append(clean_func(to_clean))
- # Normalize %xx escapes (e.g. %2f -> %2F)
- cleaned_parts.append(reserved.upper())
- return ''.join(cleaned_parts)
- def _clean_link(url):
- # type: (str) -> str
- """
- Make sure a link is fully quoted.
- For example, if ' ' occurs in the URL, it will be replaced with "%20",
- and without double-quoting other characters.
- """
- # Split the URL into parts according to the general structure
- # `scheme://netloc/path;parameters?query#fragment`.
- result = urllib_parse.urlparse(url)
- # If the netloc is empty, then the URL refers to a local filesystem path.
- is_local_path = not result.netloc
- path = _clean_url_path(result.path, is_local_path=is_local_path)
- return urllib_parse.urlunparse(result._replace(path=path))
- def _create_link_from_element(
- anchor, # type: HTMLElement
- page_url, # type: str
- base_url, # type: str
- ):
- # type: (...) -> Optional[Link]
- """
- Convert an anchor element in a simple repository page to a Link.
- """
- href = anchor.get("href")
- if not href:
- return None
- url = _clean_link(urllib_parse.urljoin(base_url, href))
- pyrequire = anchor.get('data-requires-python')
- pyrequire = unescape(pyrequire) if pyrequire else None
- yanked_reason = anchor.get('data-yanked')
- if yanked_reason:
- # This is a unicode string in Python 2 (and 3).
- yanked_reason = unescape(yanked_reason)
- link = Link(
- url,
- comes_from=page_url,
- requires_python=pyrequire,
- yanked_reason=yanked_reason,
- )
- return link
- class CacheablePageContent(object):
- def __init__(self, page):
- # type: (HTMLPage) -> None
- assert page.cache_link_parsing
- self.page = page
- def __eq__(self, other):
- # type: (object) -> bool
- return (isinstance(other, type(self)) and
- self.page.url == other.page.url)
- def __hash__(self):
- # type: () -> int
- return hash(self.page.url)
- def with_cached_html_pages(
- fn, # type: Callable[[HTMLPage], Iterable[Link]]
- ):
- # type: (...) -> Callable[[HTMLPage], List[Link]]
- """
- Given a function that parses an Iterable[Link] from an HTMLPage, cache the
- function's result (keyed by CacheablePageContent), unless the HTMLPage
- `page` has `page.cache_link_parsing == False`.
- """
- @_lru_cache(maxsize=None)
- def wrapper(cacheable_page):
- # type: (CacheablePageContent) -> List[Link]
- return list(fn(cacheable_page.page))
- @functools.wraps(fn)
- def wrapper_wrapper(page):
- # type: (HTMLPage) -> List[Link]
- if page.cache_link_parsing:
- return wrapper(CacheablePageContent(page))
- return list(fn(page))
- return wrapper_wrapper
- @with_cached_html_pages
- def parse_links(page):
- # type: (HTMLPage) -> Iterable[Link]
- """
- Parse an HTML document, and yield its anchor elements as Link objects.
- """
- document = html5lib.parse(
- page.content,
- transport_encoding=page.encoding,
- namespaceHTMLElements=False,
- )
- url = page.url
- base_url = _determine_base_url(document, url)
- for anchor in document.findall(".//a"):
- link = _create_link_from_element(
- anchor,
- page_url=url,
- base_url=base_url,
- )
- if link is None:
- continue
- yield link
- class HTMLPage(object):
- """Represents one page, along with its URL"""
- def __init__(
- self,
- content, # type: bytes
- encoding, # type: Optional[str]
- url, # type: str
- cache_link_parsing=True, # type: bool
- ):
- # type: (...) -> None
- """
- :param encoding: the encoding to decode the given content.
- :param url: the URL from which the HTML was downloaded.
- :param cache_link_parsing: whether links parsed from this page's url
- should be cached. PyPI index urls should
- have this set to False, for example.
- """
- self.content = content
- self.encoding = encoding
- self.url = url
- self.cache_link_parsing = cache_link_parsing
- def __str__(self):
- # type: () -> str
- return redact_auth_from_url(self.url)
- def _handle_get_page_fail(
- link, # type: Link
- reason, # type: Union[str, Exception]
- meth=None # type: Optional[Callable[..., None]]
- ):
- # type: (...) -> None
- if meth is None:
- meth = logger.debug
- meth("Could not fetch URL %s: %s - skipping", link, reason)
- def _make_html_page(response, cache_link_parsing=True):
- # type: (Response, bool) -> HTMLPage
- encoding = _get_encoding_from_headers(response.headers)
- return HTMLPage(
- response.content,
- encoding=encoding,
- url=response.url,
- cache_link_parsing=cache_link_parsing)
- def _get_html_page(link, session=None):
- # type: (Link, Optional[PipSession]) -> Optional[HTMLPage]
- if session is None:
- raise TypeError(
- "_get_html_page() missing 1 required keyword argument: 'session'"
- )
- url = link.url.split('#', 1)[0]
- # Check for VCS schemes that do not support lookup as web pages.
- vcs_scheme = _match_vcs_scheme(url)
- if vcs_scheme:
- logger.debug('Cannot look at %s URL %s', vcs_scheme, link)
- return None
- # Tack index.html onto file:// URLs that point to directories
- scheme, _, path, _, _, _ = urllib_parse.urlparse(url)
- if (scheme == 'file' and os.path.isdir(urllib_request.url2pathname(path))):
- # add trailing slash if not present so urljoin doesn't trim
- # final segment
- if not url.endswith('/'):
- url += '/'
- url = urllib_parse.urljoin(url, 'index.html')
- logger.debug(' file: URL is directory, getting %s', url)
- try:
- resp = _get_html_response(url, session=session)
- except _NotHTTP:
- logger.debug(
- 'Skipping page %s because it looks like an archive, and cannot '
- 'be checked by HEAD.', link,
- )
- except _NotHTML as exc:
- logger.debug(
- 'Skipping page %s because the %s request got Content-Type: %s',
- link, exc.request_desc, exc.content_type,
- )
- except HTTPError as exc:
- _handle_get_page_fail(link, exc)
- except RetryError as exc:
- _handle_get_page_fail(link, exc)
- except SSLError as exc:
- reason = "There was a problem confirming the ssl certificate: "
- reason += str(exc)
- _handle_get_page_fail(link, reason, meth=logger.info)
- except requests.ConnectionError as exc:
- _handle_get_page_fail(link, "connection error: {}".format(exc))
- except requests.Timeout:
- _handle_get_page_fail(link, "timed out")
- else:
- return _make_html_page(resp,
- cache_link_parsing=link.cache_link_parsing)
- return None
- def _remove_duplicate_links(links):
- # type: (Iterable[Link]) -> List[Link]
- """
- Return a list of links, with duplicates removed and ordering preserved.
- """
- # We preserve the ordering when removing duplicates because we can.
- return list(OrderedDict.fromkeys(links))
- def group_locations(locations, expand_dir=False):
- # type: (Sequence[str], bool) -> Tuple[List[str], List[str]]
- """
- Divide a list of locations into two groups: "files" (archives) and "urls."
- :return: A pair of lists (files, urls).
- """
- files = []
- urls = []
- # puts the url for the given file path into the appropriate list
- def sort_path(path):
- # type: (str) -> None
- url = path_to_url(path)
- if mimetypes.guess_type(url, strict=False)[0] == 'text/html':
- urls.append(url)
- else:
- files.append(url)
- for url in locations:
- is_local_path = os.path.exists(url)
- is_file_url = url.startswith('file:')
- if is_local_path or is_file_url:
- if is_local_path:
- path = url
- else:
- path = url_to_path(url)
- if os.path.isdir(path):
- if expand_dir:
- path = os.path.realpath(path)
- for item in os.listdir(path):
- sort_path(os.path.join(path, item))
- elif is_file_url:
- urls.append(url)
- else:
- logger.warning(
- "Path '{0}' is ignored: "
- "it is a directory.".format(path),
- )
- elif os.path.isfile(path):
- sort_path(path)
- else:
- logger.warning(
- "Url '%s' is ignored: it is neither a file "
- "nor a directory.", url,
- )
- elif is_url(url):
- # Only add url with clear scheme
- urls.append(url)
- else:
- logger.warning(
- "Url '%s' is ignored. It is either a non-existing "
- "path or lacks a specific scheme.", url,
- )
- return files, urls
- class CollectedLinks(object):
- """
- Encapsulates the return value of a call to LinkCollector.collect_links().
- The return value includes both URLs to project pages containing package
- links, as well as individual package Link objects collected from other
- sources.
- This info is stored separately as:
- (1) links from the configured file locations,
- (2) links from the configured find_links, and
- (3) urls to HTML project pages, as described by the PEP 503 simple
- repository API.
- """
- def __init__(
- self,
- files, # type: List[Link]
- find_links, # type: List[Link]
- project_urls, # type: List[Link]
- ):
- # type: (...) -> None
- """
- :param files: Links from file locations.
- :param find_links: Links from find_links.
- :param project_urls: URLs to HTML project pages, as described by
- the PEP 503 simple repository API.
- """
- self.files = files
- self.find_links = find_links
- self.project_urls = project_urls
- class LinkCollector(object):
- """
- Responsible for collecting Link objects from all configured locations,
- making network requests as needed.
- The class's main method is its collect_links() method.
- """
- def __init__(
- self,
- session, # type: PipSession
- search_scope, # type: SearchScope
- ):
- # type: (...) -> None
- self.search_scope = search_scope
- self.session = session
- @property
- def find_links(self):
- # type: () -> List[str]
- return self.search_scope.find_links
- def fetch_page(self, location):
- # type: (Link) -> Optional[HTMLPage]
- """
- Fetch an HTML page containing package links.
- """
- return _get_html_page(location, session=self.session)
- def collect_links(self, project_name):
- # type: (str) -> CollectedLinks
- """Find all available links for the given project name.
- :return: All the Link objects (unfiltered), as a CollectedLinks object.
- """
- search_scope = self.search_scope
- index_locations = search_scope.get_index_urls_locations(project_name)
- index_file_loc, index_url_loc = group_locations(index_locations)
- fl_file_loc, fl_url_loc = group_locations(
- self.find_links, expand_dir=True,
- )
- file_links = [
- Link(url) for url in itertools.chain(index_file_loc, fl_file_loc)
- ]
- # We trust every directly linked archive in find_links
- find_link_links = [Link(url, '-f') for url in self.find_links]
- # We trust every url that the user has given us whether it was given
- # via --index-url or --find-links.
- # We want to filter out anything that does not have a secure origin.
- url_locations = [
- link for link in itertools.chain(
- # Mark PyPI indices as "cache_link_parsing == False" -- this
- # will avoid caching the result of parsing the page for links.
- (Link(url, cache_link_parsing=False) for url in index_url_loc),
- (Link(url) for url in fl_url_loc),
- )
- if self.session.is_secure_origin(link)
- ]
- url_locations = _remove_duplicate_links(url_locations)
- lines = [
- '{} location(s) to search for versions of {}:'.format(
- len(url_locations), project_name,
- ),
- ]
- for link in url_locations:
- lines.append('* {}'.format(link))
- logger.debug('\n'.join(lines))
- return CollectedLinks(
- files=file_links,
- find_links=find_link_links,
- project_urls=url_locations,
- )
|