timed.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import time
  2. from datetime import datetime
  3. from ._compat import text_type
  4. from .encoding import base64_decode
  5. from .encoding import base64_encode
  6. from .encoding import bytes_to_int
  7. from .encoding import int_to_bytes
  8. from .encoding import want_bytes
  9. from .exc import BadSignature
  10. from .exc import BadTimeSignature
  11. from .exc import SignatureExpired
  12. from .serializer import Serializer
  13. from .signer import Signer
  14. class TimestampSigner(Signer):
  15. """Works like the regular :class:`.Signer` but also records the time
  16. of the signing and can be used to expire signatures. The
  17. :meth:`unsign` method can raise :exc:`.SignatureExpired` if the
  18. unsigning failed because the signature is expired.
  19. """
  20. def get_timestamp(self):
  21. """Returns the current timestamp. The function must return an
  22. integer.
  23. """
  24. return int(time.time())
  25. def timestamp_to_datetime(self, ts):
  26. """Used to convert the timestamp from :meth:`get_timestamp` into
  27. a datetime object.
  28. """
  29. return datetime.utcfromtimestamp(ts)
  30. def sign(self, value):
  31. """Signs the given string and also attaches time information."""
  32. value = want_bytes(value)
  33. timestamp = base64_encode(int_to_bytes(self.get_timestamp()))
  34. sep = want_bytes(self.sep)
  35. value = value + sep + timestamp
  36. return value + sep + self.get_signature(value)
  37. def unsign(self, value, max_age=None, return_timestamp=False):
  38. """Works like the regular :meth:`.Signer.unsign` but can also
  39. validate the time. See the base docstring of the class for
  40. the general behavior. If ``return_timestamp`` is ``True`` the
  41. timestamp of the signature will be returned as a naive
  42. :class:`datetime.datetime` object in UTC.
  43. """
  44. try:
  45. result = Signer.unsign(self, value)
  46. sig_error = None
  47. except BadSignature as e:
  48. sig_error = e
  49. result = e.payload or b""
  50. sep = want_bytes(self.sep)
  51. # If there is no timestamp in the result there is something
  52. # seriously wrong. In case there was a signature error, we raise
  53. # that one directly, otherwise we have a weird situation in
  54. # which we shouldn't have come except someone uses a time-based
  55. # serializer on non-timestamp data, so catch that.
  56. if sep not in result:
  57. if sig_error:
  58. raise sig_error
  59. raise BadTimeSignature("timestamp missing", payload=result)
  60. value, timestamp = result.rsplit(sep, 1)
  61. try:
  62. timestamp = bytes_to_int(base64_decode(timestamp))
  63. except Exception:
  64. timestamp = None
  65. # Signature is *not* okay. Raise a proper error now that we have
  66. # split the value and the timestamp.
  67. if sig_error is not None:
  68. raise BadTimeSignature(
  69. text_type(sig_error), payload=value, date_signed=timestamp
  70. )
  71. # Signature was okay but the timestamp is actually not there or
  72. # malformed. Should not happen, but we handle it anyway.
  73. if timestamp is None:
  74. raise BadTimeSignature("Malformed timestamp", payload=value)
  75. # Check timestamp is not older than max_age
  76. if max_age is not None:
  77. age = self.get_timestamp() - timestamp
  78. if age > max_age:
  79. raise SignatureExpired(
  80. "Signature age %s > %s seconds" % (age, max_age),
  81. payload=value,
  82. date_signed=self.timestamp_to_datetime(timestamp),
  83. )
  84. if return_timestamp:
  85. return value, self.timestamp_to_datetime(timestamp)
  86. return value
  87. def validate(self, signed_value, max_age=None):
  88. """Only validates the given signed value. Returns ``True`` if
  89. the signature exists and is valid."""
  90. try:
  91. self.unsign(signed_value, max_age=max_age)
  92. return True
  93. except BadSignature:
  94. return False
  95. class TimedSerializer(Serializer):
  96. """Uses :class:`TimestampSigner` instead of the default
  97. :class:`.Signer`.
  98. """
  99. default_signer = TimestampSigner
  100. def loads(self, s, max_age=None, return_timestamp=False, salt=None):
  101. """Reverse of :meth:`dumps`, raises :exc:`.BadSignature` if the
  102. signature validation fails. If a ``max_age`` is provided it will
  103. ensure the signature is not older than that time in seconds. In
  104. case the signature is outdated, :exc:`.SignatureExpired` is
  105. raised. All arguments are forwarded to the signer's
  106. :meth:`~TimestampSigner.unsign` method.
  107. """
  108. s = want_bytes(s)
  109. last_exception = None
  110. for signer in self.iter_unsigners(salt):
  111. try:
  112. base64d, timestamp = signer.unsign(s, max_age, return_timestamp=True)
  113. payload = self.load_payload(base64d)
  114. if return_timestamp:
  115. return payload, timestamp
  116. return payload
  117. # If we get a signature expired it means we could read the
  118. # signature but it's invalid. In that case we do not want to
  119. # try the next signer.
  120. except SignatureExpired:
  121. raise
  122. except BadSignature as err:
  123. last_exception = err
  124. raise last_exception
  125. def loads_unsafe(self, s, max_age=None, salt=None):
  126. load_kwargs = {"max_age": max_age}
  127. load_payload_kwargs = {}
  128. return self._loads_unsafe_impl(s, salt, load_kwargs, load_payload_kwargs)