xmlrpc.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. """xmlrpclib.Transport implementation
  2. """
  3. import logging
  4. # NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
  5. # why we ignore the type on this import
  6. from pip._vendor.six.moves import xmlrpc_client # type: ignore
  7. from pip._vendor.six.moves.urllib import parse as urllib_parse
  8. from pip._internal.exceptions import NetworkConnectionError
  9. from pip._internal.network.utils import raise_for_status
  10. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  11. if MYPY_CHECK_RUNNING:
  12. from typing import Dict
  13. from pip._internal.network.session import PipSession
  14. logger = logging.getLogger(__name__)
  15. class PipXmlrpcTransport(xmlrpc_client.Transport):
  16. """Provide a `xmlrpclib.Transport` implementation via a `PipSession`
  17. object.
  18. """
  19. def __init__(self, index_url, session, use_datetime=False):
  20. # type: (str, PipSession, bool) -> None
  21. xmlrpc_client.Transport.__init__(self, use_datetime)
  22. index_parts = urllib_parse.urlparse(index_url)
  23. self._scheme = index_parts.scheme
  24. self._session = session
  25. def request(self, host, handler, request_body, verbose=False):
  26. # type: (str, str, Dict[str, str], bool) -> None
  27. parts = (self._scheme, host, handler, None, None, None)
  28. url = urllib_parse.urlunparse(parts)
  29. try:
  30. headers = {'Content-Type': 'text/xml'}
  31. response = self._session.post(url, data=request_body,
  32. headers=headers, stream=True)
  33. raise_for_status(response)
  34. self.verbose = verbose
  35. return self.parse_response(response.raw)
  36. except NetworkConnectionError as exc:
  37. assert exc.response
  38. logger.critical(
  39. "HTTP error %s while getting %s",
  40. exc.response.status_code, url,
  41. )
  42. raise