jws.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import hashlib
  2. import time
  3. from datetime import datetime
  4. from ._compat import number_types
  5. from ._json import _CompactJSON
  6. from ._json import json
  7. from .encoding import base64_decode
  8. from .encoding import base64_encode
  9. from .encoding import want_bytes
  10. from .exc import BadData
  11. from .exc import BadHeader
  12. from .exc import BadPayload
  13. from .exc import BadSignature
  14. from .exc import SignatureExpired
  15. from .serializer import Serializer
  16. from .signer import HMACAlgorithm
  17. from .signer import NoneAlgorithm
  18. class JSONWebSignatureSerializer(Serializer):
  19. """This serializer implements JSON Web Signature (JWS) support. Only
  20. supports the JWS Compact Serialization.
  21. """
  22. jws_algorithms = {
  23. "HS256": HMACAlgorithm(hashlib.sha256),
  24. "HS384": HMACAlgorithm(hashlib.sha384),
  25. "HS512": HMACAlgorithm(hashlib.sha512),
  26. "none": NoneAlgorithm(),
  27. }
  28. #: The default algorithm to use for signature generation
  29. default_algorithm = "HS512"
  30. default_serializer = _CompactJSON
  31. def __init__(
  32. self,
  33. secret_key,
  34. salt=None,
  35. serializer=None,
  36. serializer_kwargs=None,
  37. signer=None,
  38. signer_kwargs=None,
  39. algorithm_name=None,
  40. ):
  41. Serializer.__init__(
  42. self,
  43. secret_key=secret_key,
  44. salt=salt,
  45. serializer=serializer,
  46. serializer_kwargs=serializer_kwargs,
  47. signer=signer,
  48. signer_kwargs=signer_kwargs,
  49. )
  50. if algorithm_name is None:
  51. algorithm_name = self.default_algorithm
  52. self.algorithm_name = algorithm_name
  53. self.algorithm = self.make_algorithm(algorithm_name)
  54. def load_payload(self, payload, serializer=None, return_header=False):
  55. payload = want_bytes(payload)
  56. if b"." not in payload:
  57. raise BadPayload('No "." found in value')
  58. base64d_header, base64d_payload = payload.split(b".", 1)
  59. try:
  60. json_header = base64_decode(base64d_header)
  61. except Exception as e:
  62. raise BadHeader(
  63. "Could not base64 decode the header because of an exception",
  64. original_error=e,
  65. )
  66. try:
  67. json_payload = base64_decode(base64d_payload)
  68. except Exception as e:
  69. raise BadPayload(
  70. "Could not base64 decode the payload because of an exception",
  71. original_error=e,
  72. )
  73. try:
  74. header = Serializer.load_payload(self, json_header, serializer=json)
  75. except BadData as e:
  76. raise BadHeader(
  77. "Could not unserialize header because it was malformed",
  78. original_error=e,
  79. )
  80. if not isinstance(header, dict):
  81. raise BadHeader("Header payload is not a JSON object", header=header)
  82. payload = Serializer.load_payload(self, json_payload, serializer=serializer)
  83. if return_header:
  84. return payload, header
  85. return payload
  86. def dump_payload(self, header, obj):
  87. base64d_header = base64_encode(
  88. self.serializer.dumps(header, **self.serializer_kwargs)
  89. )
  90. base64d_payload = base64_encode(
  91. self.serializer.dumps(obj, **self.serializer_kwargs)
  92. )
  93. return base64d_header + b"." + base64d_payload
  94. def make_algorithm(self, algorithm_name):
  95. try:
  96. return self.jws_algorithms[algorithm_name]
  97. except KeyError:
  98. raise NotImplementedError("Algorithm not supported")
  99. def make_signer(self, salt=None, algorithm=None):
  100. if salt is None:
  101. salt = self.salt
  102. key_derivation = "none" if salt is None else None
  103. if algorithm is None:
  104. algorithm = self.algorithm
  105. return self.signer(
  106. self.secret_key,
  107. salt=salt,
  108. sep=".",
  109. key_derivation=key_derivation,
  110. algorithm=algorithm,
  111. )
  112. def make_header(self, header_fields):
  113. header = header_fields.copy() if header_fields else {}
  114. header["alg"] = self.algorithm_name
  115. return header
  116. def dumps(self, obj, salt=None, header_fields=None):
  117. """Like :meth:`.Serializer.dumps` but creates a JSON Web
  118. Signature. It also allows for specifying additional fields to be
  119. included in the JWS header.
  120. """
  121. header = self.make_header(header_fields)
  122. signer = self.make_signer(salt, self.algorithm)
  123. return signer.sign(self.dump_payload(header, obj))
  124. def loads(self, s, salt=None, return_header=False):
  125. """Reverse of :meth:`dumps`. If requested via ``return_header``
  126. it will return a tuple of payload and header.
  127. """
  128. payload, header = self.load_payload(
  129. self.make_signer(salt, self.algorithm).unsign(want_bytes(s)),
  130. return_header=True,
  131. )
  132. if header.get("alg") != self.algorithm_name:
  133. raise BadHeader("Algorithm mismatch", header=header, payload=payload)
  134. if return_header:
  135. return payload, header
  136. return payload
  137. def loads_unsafe(self, s, salt=None, return_header=False):
  138. kwargs = {"return_header": return_header}
  139. return self._loads_unsafe_impl(s, salt, kwargs, kwargs)
  140. class TimedJSONWebSignatureSerializer(JSONWebSignatureSerializer):
  141. """Works like the regular :class:`JSONWebSignatureSerializer` but
  142. also records the time of the signing and can be used to expire
  143. signatures.
  144. JWS currently does not specify this behavior but it mentions a
  145. possible extension like this in the spec. Expiry date is encoded
  146. into the header similar to what's specified in `draft-ietf-oauth
  147. -json-web-token <http://self-issued.info/docs/draft-ietf-oauth-json
  148. -web-token.html#expDef>`_.
  149. """
  150. DEFAULT_EXPIRES_IN = 3600
  151. def __init__(self, secret_key, expires_in=None, **kwargs):
  152. JSONWebSignatureSerializer.__init__(self, secret_key, **kwargs)
  153. if expires_in is None:
  154. expires_in = self.DEFAULT_EXPIRES_IN
  155. self.expires_in = expires_in
  156. def make_header(self, header_fields):
  157. header = JSONWebSignatureSerializer.make_header(self, header_fields)
  158. iat = self.now()
  159. exp = iat + self.expires_in
  160. header["iat"] = iat
  161. header["exp"] = exp
  162. return header
  163. def loads(self, s, salt=None, return_header=False):
  164. payload, header = JSONWebSignatureSerializer.loads(
  165. self, s, salt, return_header=True
  166. )
  167. if "exp" not in header:
  168. raise BadSignature("Missing expiry date", payload=payload)
  169. int_date_error = BadHeader("Expiry date is not an IntDate", payload=payload)
  170. try:
  171. header["exp"] = int(header["exp"])
  172. except ValueError:
  173. raise int_date_error
  174. if header["exp"] < 0:
  175. raise int_date_error
  176. if header["exp"] < self.now():
  177. raise SignatureExpired(
  178. "Signature expired",
  179. payload=payload,
  180. date_signed=self.get_issue_date(header),
  181. )
  182. if return_header:
  183. return payload, header
  184. return payload
  185. def get_issue_date(self, header):
  186. rv = header.get("iat")
  187. if isinstance(rv, number_types):
  188. return datetime.utcfromtimestamp(int(rv))
  189. def now(self):
  190. return int(time.time())