123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- """Download files with progress indicators.
- """
- import cgi
- import logging
- import mimetypes
- import os
- from pip._vendor import requests
- from pip._vendor.requests.models import CONTENT_CHUNK_SIZE
- from pip._internal.cli.progress_bars import DownloadProgressProvider
- from pip._internal.models.index import PyPI
- from pip._internal.network.cache import is_from_cache
- from pip._internal.network.utils import response_chunks
- from pip._internal.utils.misc import (
- format_size,
- redact_auth_from_url,
- splitext,
- )
- from pip._internal.utils.typing import MYPY_CHECK_RUNNING
- if MYPY_CHECK_RUNNING:
- from typing import Iterable, Optional
- from pip._vendor.requests.models import Response
- from pip._internal.models.link import Link
- from pip._internal.network.session import PipSession
- logger = logging.getLogger(__name__)
- def _get_http_response_size(resp):
- # type: (Response) -> Optional[int]
- try:
- return int(resp.headers['content-length'])
- except (ValueError, KeyError, TypeError):
- return None
- def _prepare_download(
- resp, # type: Response
- link, # type: Link
- progress_bar # type: str
- ):
- # type: (...) -> Iterable[bytes]
- total_length = _get_http_response_size(resp)
- if link.netloc == PyPI.file_storage_domain:
- url = link.show_url
- else:
- url = link.url_without_fragment
- logged_url = redact_auth_from_url(url)
- if total_length:
- logged_url = '{} ({})'.format(logged_url, format_size(total_length))
- if is_from_cache(resp):
- logger.info("Using cached %s", logged_url)
- else:
- logger.info("Downloading %s", logged_url)
- if logger.getEffectiveLevel() > logging.INFO:
- show_progress = False
- elif is_from_cache(resp):
- show_progress = False
- elif not total_length:
- show_progress = True
- elif total_length > (40 * 1000):
- show_progress = True
- else:
- show_progress = False
- chunks = response_chunks(resp, CONTENT_CHUNK_SIZE)
- if not show_progress:
- return chunks
- return DownloadProgressProvider(
- progress_bar, max=total_length
- )(chunks)
- def sanitize_content_filename(filename):
- # type: (str) -> str
- """
- Sanitize the "filename" value from a Content-Disposition header.
- """
- return os.path.basename(filename)
- def parse_content_disposition(content_disposition, default_filename):
- # type: (str, str) -> str
- """
- Parse the "filename" value from a Content-Disposition header, and
- return the default filename if the result is empty.
- """
- _type, params = cgi.parse_header(content_disposition)
- filename = params.get('filename')
- if filename:
- # We need to sanitize the filename to prevent directory traversal
- # in case the filename contains ".." path parts.
- filename = sanitize_content_filename(filename)
- return filename or default_filename
- def _get_http_response_filename(resp, link):
- # type: (Response, Link) -> str
- """Get an ideal filename from the given HTTP response, falling back to
- the link filename if not provided.
- """
- filename = link.filename # fallback
- # Have a look at the Content-Disposition header for a better guess
- content_disposition = resp.headers.get('content-disposition')
- if content_disposition:
- filename = parse_content_disposition(content_disposition, filename)
- ext = splitext(filename)[1] # type: Optional[str]
- if not ext:
- ext = mimetypes.guess_extension(
- resp.headers.get('content-type', '')
- )
- if ext:
- filename += ext
- if not ext and link.url != resp.url:
- ext = os.path.splitext(resp.url)[1]
- if ext:
- filename += ext
- return filename
- def _http_get_download(session, link):
- # type: (PipSession, Link) -> Response
- target_url = link.url.split('#', 1)[0]
- resp = session.get(
- target_url,
- # We use Accept-Encoding: identity here because requests
- # defaults to accepting compressed responses. This breaks in
- # a variety of ways depending on how the server is configured.
- # - Some servers will notice that the file isn't a compressible
- # file and will leave the file alone and with an empty
- # Content-Encoding
- # - Some servers will notice that the file is already
- # compressed and will leave the file alone and will add a
- # Content-Encoding: gzip header
- # - Some servers won't notice anything at all and will take
- # a file that's already been compressed and compress it again
- # and set the Content-Encoding: gzip header
- # By setting this to request only the identity encoding We're
- # hoping to eliminate the third case. Hopefully there does not
- # exist a server which when given a file will notice it is
- # already compressed and that you're not asking for a
- # compressed file and will then decompress it before sending
- # because if that's the case I don't think it'll ever be
- # possible to make this work.
- headers={"Accept-Encoding": "identity"},
- stream=True,
- )
- resp.raise_for_status()
- return resp
- class Download(object):
- def __init__(
- self,
- response, # type: Response
- filename, # type: str
- chunks, # type: Iterable[bytes]
- ):
- # type: (...) -> None
- self.response = response
- self.filename = filename
- self.chunks = chunks
- class Downloader(object):
- def __init__(
- self,
- session, # type: PipSession
- progress_bar, # type: str
- ):
- # type: (...) -> None
- self._session = session
- self._progress_bar = progress_bar
- def __call__(self, link):
- # type: (Link) -> Download
- try:
- resp = _http_get_download(self._session, link)
- except requests.HTTPError as e:
- logger.critical(
- "HTTP error %s while getting %s", e.response.status_code, link
- )
- raise
- return Download(
- resp,
- _get_http_response_filename(resp, link),
- _prepare_download(resp, link, self._progress_bar),
- )
|