12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307 |
- # -*- coding: utf-8 -*-
- """
- werkzeug.http
- ~~~~~~~~~~~~~
- Werkzeug comes with a bunch of utilities that help Werkzeug to deal with
- HTTP data. Most of the classes and functions provided by this module are
- used by the wrappers, but they are useful on their own, too, especially if
- the response and request objects are not used.
- This covers some of the more HTTP centric features of WSGI, some other
- utilities such as cookie handling are documented in the `werkzeug.utils`
- module.
- :copyright: 2007 Pallets
- :license: BSD-3-Clause
- """
- import base64
- import re
- import warnings
- from datetime import datetime
- from datetime import timedelta
- from hashlib import md5
- from time import gmtime
- from time import time
- from ._compat import integer_types
- from ._compat import iteritems
- from ._compat import PY2
- from ._compat import string_types
- from ._compat import text_type
- from ._compat import to_bytes
- from ._compat import to_unicode
- from ._compat import try_coerce_native
- from ._internal import _cookie_parse_impl
- from ._internal import _cookie_quote
- from ._internal import _make_cookie_domain
- try:
- from email.utils import parsedate_tz
- except ImportError:
- from email.Utils import parsedate_tz
- try:
- from urllib.request import parse_http_list as _parse_list_header
- from urllib.parse import unquote_to_bytes as _unquote
- except ImportError:
- from urllib2 import parse_http_list as _parse_list_header
- from urllib2 import unquote as _unquote
- _cookie_charset = "latin1"
- _basic_auth_charset = "utf-8"
- # for explanation of "media-range", etc. see Sections 5.3.{1,2} of RFC 7231
- _accept_re = re.compile(
- r"""
- ( # media-range capturing-parenthesis
- [^\s;,]+ # type/subtype
- (?:[ \t]*;[ \t]* # ";"
- (?: # parameter non-capturing-parenthesis
- [^\s;,q][^\s;,]* # token that doesn't start with "q"
- | # or
- q[^\s;,=][^\s;,]* # token that is more than just "q"
- )
- )* # zero or more parameters
- ) # end of media-range
- (?:[ \t]*;[ \t]*q= # weight is a "q" parameter
- (\d*(?:\.\d+)?) # qvalue capturing-parentheses
- [^,]* # "extension" accept params: who cares?
- )? # accept params are optional
- """,
- re.VERBOSE,
- )
- _token_chars = frozenset(
- "!#$%&'*+-.0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ^_`abcdefghijklmnopqrstuvwxyz|~"
- )
- _etag_re = re.compile(r'([Ww]/)?(?:"(.*?)"|(.*?))(?:\s*,\s*|$)')
- _unsafe_header_chars = set('()<>@,;:"/[]?={} \t')
- _option_header_piece_re = re.compile(
- r"""
- ;\s*,?\s* # newlines were replaced with commas
- (?P<key>
- "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
- |
- [^\s;,=*]+ # token
- )
- (?:\*(?P<count>\d+))? # *1, optional continuation index
- \s*
- (?: # optionally followed by =value
- (?: # equals sign, possibly with encoding
- \*\s*=\s* # * indicates extended notation
- (?: # optional encoding
- (?P<encoding>[^\s]+?)
- '(?P<language>[^\s]*?)'
- )?
- |
- =\s* # basic notation
- )
- (?P<value>
- "[^"\\]*(?:\\.[^"\\]*)*" # quoted string
- |
- [^;,]+ # token
- )?
- )?
- \s*
- """,
- flags=re.VERBOSE,
- )
- _option_header_start_mime_type = re.compile(r",\s*([^;,\s]+)([;,]\s*.+)?")
- _entity_headers = frozenset(
- [
- "allow",
- "content-encoding",
- "content-language",
- "content-length",
- "content-location",
- "content-md5",
- "content-range",
- "content-type",
- "expires",
- "last-modified",
- ]
- )
- _hop_by_hop_headers = frozenset(
- [
- "connection",
- "keep-alive",
- "proxy-authenticate",
- "proxy-authorization",
- "te",
- "trailer",
- "transfer-encoding",
- "upgrade",
- ]
- )
- HTTP_STATUS_CODES = {
- 100: "Continue",
- 101: "Switching Protocols",
- 102: "Processing",
- 103: "Early Hints", # see RFC 8297
- 200: "OK",
- 201: "Created",
- 202: "Accepted",
- 203: "Non Authoritative Information",
- 204: "No Content",
- 205: "Reset Content",
- 206: "Partial Content",
- 207: "Multi Status",
- 208: "Already Reported", # see RFC 5842
- 226: "IM Used", # see RFC 3229
- 300: "Multiple Choices",
- 301: "Moved Permanently",
- 302: "Found",
- 303: "See Other",
- 304: "Not Modified",
- 305: "Use Proxy",
- 306: "Switch Proxy", # unused
- 307: "Temporary Redirect",
- 308: "Permanent Redirect",
- 400: "Bad Request",
- 401: "Unauthorized",
- 402: "Payment Required", # unused
- 403: "Forbidden",
- 404: "Not Found",
- 405: "Method Not Allowed",
- 406: "Not Acceptable",
- 407: "Proxy Authentication Required",
- 408: "Request Timeout",
- 409: "Conflict",
- 410: "Gone",
- 411: "Length Required",
- 412: "Precondition Failed",
- 413: "Request Entity Too Large",
- 414: "Request URI Too Long",
- 415: "Unsupported Media Type",
- 416: "Requested Range Not Satisfiable",
- 417: "Expectation Failed",
- 418: "I'm a teapot", # see RFC 2324
- 421: "Misdirected Request", # see RFC 7540
- 422: "Unprocessable Entity",
- 423: "Locked",
- 424: "Failed Dependency",
- 425: "Too Early", # see RFC 8470
- 426: "Upgrade Required",
- 428: "Precondition Required", # see RFC 6585
- 429: "Too Many Requests",
- 431: "Request Header Fields Too Large",
- 449: "Retry With", # proprietary MS extension
- 451: "Unavailable For Legal Reasons",
- 500: "Internal Server Error",
- 501: "Not Implemented",
- 502: "Bad Gateway",
- 503: "Service Unavailable",
- 504: "Gateway Timeout",
- 505: "HTTP Version Not Supported",
- 506: "Variant Also Negotiates", # see RFC 2295
- 507: "Insufficient Storage",
- 508: "Loop Detected", # see RFC 5842
- 510: "Not Extended",
- 511: "Network Authentication Failed", # see RFC 6585
- }
- def wsgi_to_bytes(data):
- """coerce wsgi unicode represented bytes to real ones"""
- if isinstance(data, bytes):
- return data
- return data.encode("latin1") # XXX: utf8 fallback?
- def bytes_to_wsgi(data):
- assert isinstance(data, bytes), "data must be bytes"
- if isinstance(data, str):
- return data
- else:
- return data.decode("latin1")
- def quote_header_value(value, extra_chars="", allow_token=True):
- """Quote a header value if necessary.
- .. versionadded:: 0.5
- :param value: the value to quote.
- :param extra_chars: a list of extra characters to skip quoting.
- :param allow_token: if this is enabled token values are returned
- unchanged.
- """
- if isinstance(value, bytes):
- value = bytes_to_wsgi(value)
- value = str(value)
- if allow_token:
- token_chars = _token_chars | set(extra_chars)
- if set(value).issubset(token_chars):
- return value
- return '"%s"' % value.replace("\\", "\\\\").replace('"', '\\"')
- def unquote_header_value(value, is_filename=False):
- r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
- This does not use the real unquoting but what browsers are actually
- using for quoting.
- .. versionadded:: 0.5
- :param value: the header value to unquote.
- """
- if value and value[0] == value[-1] == '"':
- # this is not the real unquoting, but fixing this so that the
- # RFC is met will result in bugs with internet explorer and
- # probably some other browsers as well. IE for example is
- # uploading files with "C:\foo\bar.txt" as filename
- value = value[1:-1]
- # if this is a filename and the starting characters look like
- # a UNC path, then just return the value without quotes. Using the
- # replace sequence below on a UNC path has the effect of turning
- # the leading double slash into a single slash and then
- # _fix_ie_filename() doesn't work correctly. See #458.
- if not is_filename or value[:2] != "\\\\":
- return value.replace("\\\\", "\\").replace('\\"', '"')
- return value
- def dump_options_header(header, options):
- """The reverse function to :func:`parse_options_header`.
- :param header: the header to dump
- :param options: a dict of options to append.
- """
- segments = []
- if header is not None:
- segments.append(header)
- for key, value in iteritems(options):
- if value is None:
- segments.append(key)
- else:
- segments.append("%s=%s" % (key, quote_header_value(value)))
- return "; ".join(segments)
- def dump_header(iterable, allow_token=True):
- """Dump an HTTP header again. This is the reversal of
- :func:`parse_list_header`, :func:`parse_set_header` and
- :func:`parse_dict_header`. This also quotes strings that include an
- equals sign unless you pass it as dict of key, value pairs.
- >>> dump_header({'foo': 'bar baz'})
- 'foo="bar baz"'
- >>> dump_header(('foo', 'bar baz'))
- 'foo, "bar baz"'
- :param iterable: the iterable or dict of values to quote.
- :param allow_token: if set to `False` tokens as values are disallowed.
- See :func:`quote_header_value` for more details.
- """
- if isinstance(iterable, dict):
- items = []
- for key, value in iteritems(iterable):
- if value is None:
- items.append(key)
- else:
- items.append(
- "%s=%s" % (key, quote_header_value(value, allow_token=allow_token))
- )
- else:
- items = [quote_header_value(x, allow_token=allow_token) for x in iterable]
- return ", ".join(items)
- def dump_csp_header(header):
- """Dump a Content Security Policy header.
- These are structured into policies such as "default-src 'self';
- script-src 'self'".
- .. versionadded:: 1.0.0
- Support for Content Security Policy headers was added.
- """
- return "; ".join("%s %s" % (key, value) for key, value in iteritems(header))
- def parse_list_header(value):
- """Parse lists as described by RFC 2068 Section 2.
- In particular, parse comma-separated lists where the elements of
- the list may include quoted-strings. A quoted-string could
- contain a comma. A non-quoted string could have quotes in the
- middle. Quotes are removed automatically after parsing.
- It basically works like :func:`parse_set_header` just that items
- may appear multiple times and case sensitivity is preserved.
- The return value is a standard :class:`list`:
- >>> parse_list_header('token, "quoted value"')
- ['token', 'quoted value']
- To create a header from the :class:`list` again, use the
- :func:`dump_header` function.
- :param value: a string with a list header.
- :return: :class:`list`
- """
- result = []
- for item in _parse_list_header(value):
- if item[:1] == item[-1:] == '"':
- item = unquote_header_value(item[1:-1])
- result.append(item)
- return result
- def parse_dict_header(value, cls=dict):
- """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
- convert them into a python dict (or any other mapping object created from
- the type with a dict like interface provided by the `cls` argument):
- >>> d = parse_dict_header('foo="is a fish", bar="as well"')
- >>> type(d) is dict
- True
- >>> sorted(d.items())
- [('bar', 'as well'), ('foo', 'is a fish')]
- If there is no value for a key it will be `None`:
- >>> parse_dict_header('key_without_value')
- {'key_without_value': None}
- To create a header from the :class:`dict` again, use the
- :func:`dump_header` function.
- .. versionchanged:: 0.9
- Added support for `cls` argument.
- :param value: a string with a dict header.
- :param cls: callable to use for storage of parsed results.
- :return: an instance of `cls`
- """
- result = cls()
- if not isinstance(value, text_type):
- # XXX: validate
- value = bytes_to_wsgi(value)
- for item in _parse_list_header(value):
- if "=" not in item:
- result[item] = None
- continue
- name, value = item.split("=", 1)
- if value[:1] == value[-1:] == '"':
- value = unquote_header_value(value[1:-1])
- result[name] = value
- return result
- def parse_options_header(value, multiple=False):
- """Parse a ``Content-Type`` like header into a tuple with the content
- type and the options:
- >>> parse_options_header('text/html; charset=utf8')
- ('text/html', {'charset': 'utf8'})
- This should not be used to parse ``Cache-Control`` like headers that use
- a slightly different format. For these headers use the
- :func:`parse_dict_header` function.
- .. versionchanged:: 0.15
- :rfc:`2231` parameter continuations are handled.
- .. versionadded:: 0.5
- :param value: the header to parse.
- :param multiple: Whether try to parse and return multiple MIME types
- :return: (mimetype, options) or (mimetype, options, mimetype, options, …)
- if multiple=True
- """
- if not value:
- return "", {}
- result = []
- value = "," + value.replace("\n", ",")
- while value:
- match = _option_header_start_mime_type.match(value)
- if not match:
- break
- result.append(match.group(1)) # mimetype
- options = {}
- # Parse options
- rest = match.group(2)
- continued_encoding = None
- while rest:
- optmatch = _option_header_piece_re.match(rest)
- if not optmatch:
- break
- option, count, encoding, language, option_value = optmatch.groups()
- # Continuations don't have to supply the encoding after the
- # first line. If we're in a continuation, track the current
- # encoding to use for subsequent lines. Reset it when the
- # continuation ends.
- if not count:
- continued_encoding = None
- else:
- if not encoding:
- encoding = continued_encoding
- continued_encoding = encoding
- option = unquote_header_value(option)
- if option_value is not None:
- option_value = unquote_header_value(option_value, option == "filename")
- if encoding is not None:
- option_value = _unquote(option_value).decode(encoding)
- if count:
- # Continuations append to the existing value. For
- # simplicity, this ignores the possibility of
- # out-of-order indices, which shouldn't happen anyway.
- options[option] = options.get(option, "") + option_value
- else:
- options[option] = option_value
- rest = rest[optmatch.end() :]
- result.append(options)
- if multiple is False:
- return tuple(result)
- value = rest
- return tuple(result) if result else ("", {})
- def parse_accept_header(value, cls=None):
- """Parses an HTTP Accept-* header. This does not implement a complete
- valid algorithm but one that supports at least value and quality
- extraction.
- Returns a new :class:`Accept` object (basically a list of ``(value, quality)``
- tuples sorted by the quality with some additional accessor methods).
- The second parameter can be a subclass of :class:`Accept` that is created
- with the parsed values and returned.
- :param value: the accept header string to be parsed.
- :param cls: the wrapper class for the return value (can be
- :class:`Accept` or a subclass thereof)
- :return: an instance of `cls`.
- """
- if cls is None:
- cls = Accept
- if not value:
- return cls(None)
- result = []
- for match in _accept_re.finditer(value):
- quality = match.group(2)
- if not quality:
- quality = 1
- else:
- quality = max(min(float(quality), 1), 0)
- result.append((match.group(1), quality))
- return cls(result)
- def parse_cache_control_header(value, on_update=None, cls=None):
- """Parse a cache control header. The RFC differs between response and
- request cache control, this method does not. It's your responsibility
- to not use the wrong control statements.
- .. versionadded:: 0.5
- The `cls` was added. If not specified an immutable
- :class:`~werkzeug.datastructures.RequestCacheControl` is returned.
- :param value: a cache control header to be parsed.
- :param on_update: an optional callable that is called every time a value
- on the :class:`~werkzeug.datastructures.CacheControl`
- object is changed.
- :param cls: the class for the returned object. By default
- :class:`~werkzeug.datastructures.RequestCacheControl` is used.
- :return: a `cls` object.
- """
- if cls is None:
- cls = RequestCacheControl
- if not value:
- return cls(None, on_update)
- return cls(parse_dict_header(value), on_update)
- def parse_csp_header(value, on_update=None, cls=None):
- """Parse a Content Security Policy header.
- .. versionadded:: 1.0.0
- Support for Content Security Policy headers was added.
- :param value: a csp header to be parsed.
- :param on_update: an optional callable that is called every time a value
- on the object is changed.
- :param cls: the class for the returned object. By default
- :class:`~werkzeug.datastructures.ContentSecurityPolicy` is used.
- :return: a `cls` object.
- """
- if cls is None:
- cls = ContentSecurityPolicy
- if value is None:
- return cls(None, on_update)
- items = []
- for policy in value.split(";"):
- policy = policy.strip()
- # Ignore badly formatted policies (no space)
- if " " in policy:
- directive, value = policy.strip().split(" ", 1)
- items.append((directive.strip(), value.strip()))
- return cls(items, on_update)
- def parse_set_header(value, on_update=None):
- """Parse a set-like header and return a
- :class:`~werkzeug.datastructures.HeaderSet` object:
- >>> hs = parse_set_header('token, "quoted value"')
- The return value is an object that treats the items case-insensitively
- and keeps the order of the items:
- >>> 'TOKEN' in hs
- True
- >>> hs.index('quoted value')
- 1
- >>> hs
- HeaderSet(['token', 'quoted value'])
- To create a header from the :class:`HeaderSet` again, use the
- :func:`dump_header` function.
- :param value: a set header to be parsed.
- :param on_update: an optional callable that is called every time a
- value on the :class:`~werkzeug.datastructures.HeaderSet`
- object is changed.
- :return: a :class:`~werkzeug.datastructures.HeaderSet`
- """
- if not value:
- return HeaderSet(None, on_update)
- return HeaderSet(parse_list_header(value), on_update)
- def parse_authorization_header(value):
- """Parse an HTTP basic/digest authorization header transmitted by the web
- browser. The return value is either `None` if the header was invalid or
- not given, otherwise an :class:`~werkzeug.datastructures.Authorization`
- object.
- :param value: the authorization header to parse.
- :return: a :class:`~werkzeug.datastructures.Authorization` object or `None`.
- """
- if not value:
- return
- value = wsgi_to_bytes(value)
- try:
- auth_type, auth_info = value.split(None, 1)
- auth_type = auth_type.lower()
- except ValueError:
- return
- if auth_type == b"basic":
- try:
- username, password = base64.b64decode(auth_info).split(b":", 1)
- except Exception:
- return
- return Authorization(
- "basic",
- {
- "username": to_unicode(username, _basic_auth_charset),
- "password": to_unicode(password, _basic_auth_charset),
- },
- )
- elif auth_type == b"digest":
- auth_map = parse_dict_header(auth_info)
- for key in "username", "realm", "nonce", "uri", "response":
- if key not in auth_map:
- return
- if "qop" in auth_map:
- if not auth_map.get("nc") or not auth_map.get("cnonce"):
- return
- return Authorization("digest", auth_map)
- def parse_www_authenticate_header(value, on_update=None):
- """Parse an HTTP WWW-Authenticate header into a
- :class:`~werkzeug.datastructures.WWWAuthenticate` object.
- :param value: a WWW-Authenticate header to parse.
- :param on_update: an optional callable that is called every time a value
- on the :class:`~werkzeug.datastructures.WWWAuthenticate`
- object is changed.
- :return: a :class:`~werkzeug.datastructures.WWWAuthenticate` object.
- """
- if not value:
- return WWWAuthenticate(on_update=on_update)
- try:
- auth_type, auth_info = value.split(None, 1)
- auth_type = auth_type.lower()
- except (ValueError, AttributeError):
- return WWWAuthenticate(value.strip().lower(), on_update=on_update)
- return WWWAuthenticate(auth_type, parse_dict_header(auth_info), on_update)
- def parse_if_range_header(value):
- """Parses an if-range header which can be an etag or a date. Returns
- a :class:`~werkzeug.datastructures.IfRange` object.
- .. versionadded:: 0.7
- """
- if not value:
- return IfRange()
- date = parse_date(value)
- if date is not None:
- return IfRange(date=date)
- # drop weakness information
- return IfRange(unquote_etag(value)[0])
- def parse_range_header(value, make_inclusive=True):
- """Parses a range header into a :class:`~werkzeug.datastructures.Range`
- object. If the header is missing or malformed `None` is returned.
- `ranges` is a list of ``(start, stop)`` tuples where the ranges are
- non-inclusive.
- .. versionadded:: 0.7
- """
- if not value or "=" not in value:
- return None
- ranges = []
- last_end = 0
- units, rng = value.split("=", 1)
- units = units.strip().lower()
- for item in rng.split(","):
- item = item.strip()
- if "-" not in item:
- return None
- if item.startswith("-"):
- if last_end < 0:
- return None
- try:
- begin = int(item)
- except ValueError:
- return None
- end = None
- last_end = -1
- elif "-" in item:
- begin, end = item.split("-", 1)
- begin = begin.strip()
- end = end.strip()
- if not begin.isdigit():
- return None
- begin = int(begin)
- if begin < last_end or last_end < 0:
- return None
- if end:
- if not end.isdigit():
- return None
- end = int(end) + 1
- if begin >= end:
- return None
- else:
- end = None
- last_end = end
- ranges.append((begin, end))
- return Range(units, ranges)
- def parse_content_range_header(value, on_update=None):
- """Parses a range header into a
- :class:`~werkzeug.datastructures.ContentRange` object or `None` if
- parsing is not possible.
- .. versionadded:: 0.7
- :param value: a content range header to be parsed.
- :param on_update: an optional callable that is called every time a value
- on the :class:`~werkzeug.datastructures.ContentRange`
- object is changed.
- """
- if value is None:
- return None
- try:
- units, rangedef = (value or "").strip().split(None, 1)
- except ValueError:
- return None
- if "/" not in rangedef:
- return None
- rng, length = rangedef.split("/", 1)
- if length == "*":
- length = None
- elif length.isdigit():
- length = int(length)
- else:
- return None
- if rng == "*":
- return ContentRange(units, None, None, length, on_update=on_update)
- elif "-" not in rng:
- return None
- start, stop = rng.split("-", 1)
- try:
- start = int(start)
- stop = int(stop) + 1
- except ValueError:
- return None
- if is_byte_range_valid(start, stop, length):
- return ContentRange(units, start, stop, length, on_update=on_update)
- def quote_etag(etag, weak=False):
- """Quote an etag.
- :param etag: the etag to quote.
- :param weak: set to `True` to tag it "weak".
- """
- if '"' in etag:
- raise ValueError("invalid etag")
- etag = '"%s"' % etag
- if weak:
- etag = "W/" + etag
- return etag
- def unquote_etag(etag):
- """Unquote a single etag:
- >>> unquote_etag('W/"bar"')
- ('bar', True)
- >>> unquote_etag('"bar"')
- ('bar', False)
- :param etag: the etag identifier to unquote.
- :return: a ``(etag, weak)`` tuple.
- """
- if not etag:
- return None, None
- etag = etag.strip()
- weak = False
- if etag.startswith(("W/", "w/")):
- weak = True
- etag = etag[2:]
- if etag[:1] == etag[-1:] == '"':
- etag = etag[1:-1]
- return etag, weak
- def parse_etags(value):
- """Parse an etag header.
- :param value: the tag header to parse
- :return: an :class:`~werkzeug.datastructures.ETags` object.
- """
- if not value:
- return ETags()
- strong = []
- weak = []
- end = len(value)
- pos = 0
- while pos < end:
- match = _etag_re.match(value, pos)
- if match is None:
- break
- is_weak, quoted, raw = match.groups()
- if raw == "*":
- return ETags(star_tag=True)
- elif quoted:
- raw = quoted
- if is_weak:
- weak.append(raw)
- else:
- strong.append(raw)
- pos = match.end()
- return ETags(strong, weak)
- def generate_etag(data):
- """Generate an etag for some data."""
- return md5(data).hexdigest()
- def parse_date(value):
- """Parse one of the following date formats into a datetime object:
- .. sourcecode:: text
- Sun, 06 Nov 1994 08:49:37 GMT ; RFC 822, updated by RFC 1123
- Sunday, 06-Nov-94 08:49:37 GMT ; RFC 850, obsoleted by RFC 1036
- Sun Nov 6 08:49:37 1994 ; ANSI C's asctime() format
- If parsing fails the return value is `None`.
- :param value: a string with a supported date format.
- :return: a :class:`datetime.datetime` object.
- """
- if value:
- t = parsedate_tz(value.strip())
- if t is not None:
- try:
- year = t[0]
- # unfortunately that function does not tell us if two digit
- # years were part of the string, or if they were prefixed
- # with two zeroes. So what we do is to assume that 69-99
- # refer to 1900, and everything below to 2000
- if year >= 0 and year <= 68:
- year += 2000
- elif year >= 69 and year <= 99:
- year += 1900
- return datetime(*((year,) + t[1:7])) - timedelta(seconds=t[-1] or 0)
- except (ValueError, OverflowError):
- return None
- def _dump_date(d, delim):
- """Used for `http_date` and `cookie_date`."""
- if d is None:
- d = gmtime()
- elif isinstance(d, datetime):
- d = d.utctimetuple()
- elif isinstance(d, (integer_types, float)):
- d = gmtime(d)
- return "%s, %02d%s%s%s%04d %02d:%02d:%02d GMT" % (
- ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")[d.tm_wday],
- d.tm_mday,
- delim,
- (
- "Jan",
- "Feb",
- "Mar",
- "Apr",
- "May",
- "Jun",
- "Jul",
- "Aug",
- "Sep",
- "Oct",
- "Nov",
- "Dec",
- )[d.tm_mon - 1],
- delim,
- d.tm_year,
- d.tm_hour,
- d.tm_min,
- d.tm_sec,
- )
- def cookie_date(expires=None):
- """Formats the time to ensure compatibility with Netscape's cookie
- standard.
- Accepts a floating point number expressed in seconds since the epoch in, a
- datetime object or a timetuple. All times in UTC. The :func:`parse_date`
- function can be used to parse such a date.
- Outputs a string in the format ``Wdy, DD-Mon-YYYY HH:MM:SS GMT``.
- :param expires: If provided that date is used, otherwise the current.
- """
- return _dump_date(expires, "-")
- def http_date(timestamp=None):
- """Formats the time to match the RFC1123 date format.
- Accepts a floating point number expressed in seconds since the epoch in, a
- datetime object or a timetuple. All times in UTC. The :func:`parse_date`
- function can be used to parse such a date.
- Outputs a string in the format ``Wdy, DD Mon YYYY HH:MM:SS GMT``.
- :param timestamp: If provided that date is used, otherwise the current.
- """
- return _dump_date(timestamp, " ")
- def parse_age(value=None):
- """Parses a base-10 integer count of seconds into a timedelta.
- If parsing fails, the return value is `None`.
- :param value: a string consisting of an integer represented in base-10
- :return: a :class:`datetime.timedelta` object or `None`.
- """
- if not value:
- return None
- try:
- seconds = int(value)
- except ValueError:
- return None
- if seconds < 0:
- return None
- try:
- return timedelta(seconds=seconds)
- except OverflowError:
- return None
- def dump_age(age=None):
- """Formats the duration as a base-10 integer.
- :param age: should be an integer number of seconds,
- a :class:`datetime.timedelta` object, or,
- if the age is unknown, `None` (default).
- """
- if age is None:
- return
- if isinstance(age, timedelta):
- # do the equivalent of Python 2.7's timedelta.total_seconds(),
- # but disregarding fractional seconds
- age = age.seconds + (age.days * 24 * 3600)
- age = int(age)
- if age < 0:
- raise ValueError("age cannot be negative")
- return str(age)
- def is_resource_modified(
- environ, etag=None, data=None, last_modified=None, ignore_if_range=True
- ):
- """Convenience method for conditional requests.
- :param environ: the WSGI environment of the request to be checked.
- :param etag: the etag for the response for comparison.
- :param data: or alternatively the data of the response to automatically
- generate an etag using :func:`generate_etag`.
- :param last_modified: an optional date of the last modification.
- :param ignore_if_range: If `False`, `If-Range` header will be taken into
- account.
- :return: `True` if the resource was modified, otherwise `False`.
- .. versionchanged:: 1.0.0
- The check is run for methods other than ``GET`` and ``HEAD``.
- """
- if etag is None and data is not None:
- etag = generate_etag(data)
- elif data is not None:
- raise TypeError("both data and etag given")
- unmodified = False
- if isinstance(last_modified, string_types):
- last_modified = parse_date(last_modified)
- # ensure that microsecond is zero because the HTTP spec does not transmit
- # that either and we might have some false positives. See issue #39
- if last_modified is not None:
- last_modified = last_modified.replace(microsecond=0)
- if_range = None
- if not ignore_if_range and "HTTP_RANGE" in environ:
- # https://tools.ietf.org/html/rfc7233#section-3.2
- # A server MUST ignore an If-Range header field received in a request
- # that does not contain a Range header field.
- if_range = parse_if_range_header(environ.get("HTTP_IF_RANGE"))
- if if_range is not None and if_range.date is not None:
- modified_since = if_range.date
- else:
- modified_since = parse_date(environ.get("HTTP_IF_MODIFIED_SINCE"))
- if modified_since and last_modified and last_modified <= modified_since:
- unmodified = True
- if etag:
- etag, _ = unquote_etag(etag)
- if if_range is not None and if_range.etag is not None:
- unmodified = parse_etags(if_range.etag).contains(etag)
- else:
- if_none_match = parse_etags(environ.get("HTTP_IF_NONE_MATCH"))
- if if_none_match:
- # https://tools.ietf.org/html/rfc7232#section-3.2
- # "A recipient MUST use the weak comparison function when comparing
- # entity-tags for If-None-Match"
- unmodified = if_none_match.contains_weak(etag)
- # https://tools.ietf.org/html/rfc7232#section-3.1
- # "Origin server MUST use the strong comparison function when
- # comparing entity-tags for If-Match"
- if_match = parse_etags(environ.get("HTTP_IF_MATCH"))
- if if_match:
- unmodified = not if_match.is_strong(etag)
- return not unmodified
- def remove_entity_headers(headers, allowed=("expires", "content-location")):
- """Remove all entity headers from a list or :class:`Headers` object. This
- operation works in-place. `Expires` and `Content-Location` headers are
- by default not removed. The reason for this is :rfc:`2616` section
- 10.3.5 which specifies some entity headers that should be sent.
- .. versionchanged:: 0.5
- added `allowed` parameter.
- :param headers: a list or :class:`Headers` object.
- :param allowed: a list of headers that should still be allowed even though
- they are entity headers.
- """
- allowed = set(x.lower() for x in allowed)
- headers[:] = [
- (key, value)
- for key, value in headers
- if not is_entity_header(key) or key.lower() in allowed
- ]
- def remove_hop_by_hop_headers(headers):
- """Remove all HTTP/1.1 "Hop-by-Hop" headers from a list or
- :class:`Headers` object. This operation works in-place.
- .. versionadded:: 0.5
- :param headers: a list or :class:`Headers` object.
- """
- headers[:] = [
- (key, value) for key, value in headers if not is_hop_by_hop_header(key)
- ]
- def is_entity_header(header):
- """Check if a header is an entity header.
- .. versionadded:: 0.5
- :param header: the header to test.
- :return: `True` if it's an entity header, `False` otherwise.
- """
- return header.lower() in _entity_headers
- def is_hop_by_hop_header(header):
- """Check if a header is an HTTP/1.1 "Hop-by-Hop" header.
- .. versionadded:: 0.5
- :param header: the header to test.
- :return: `True` if it's an HTTP/1.1 "Hop-by-Hop" header, `False` otherwise.
- """
- return header.lower() in _hop_by_hop_headers
- def parse_cookie(header, charset="utf-8", errors="replace", cls=None):
- """Parse a cookie from a string or WSGI environ.
- The same key can be provided multiple times, the values are stored
- in-order. The default :class:`MultiDict` will have the first value
- first, and all values can be retrieved with
- :meth:`MultiDict.getlist`.
- :param header: The cookie header as a string, or a WSGI environ dict
- with a ``HTTP_COOKIE`` key.
- :param charset: The charset for the cookie values.
- :param errors: The error behavior for the charset decoding.
- :param cls: A dict-like class to store the parsed cookies in.
- Defaults to :class:`MultiDict`.
- .. versionchanged:: 1.0.0
- Returns a :class:`MultiDict` instead of a
- ``TypeConversionDict``.
- .. versionchanged:: 0.5
- Returns a :class:`TypeConversionDict` instead of a regular dict.
- The ``cls`` parameter was added.
- """
- if isinstance(header, dict):
- header = header.get("HTTP_COOKIE", "")
- elif header is None:
- header = ""
- # On Python 3, PEP 3333 sends headers through the environ as latin1
- # decoded strings. Encode strings back to bytes for parsing.
- if isinstance(header, text_type):
- header = header.encode("latin1", "replace")
- if cls is None:
- cls = MultiDict
- def _parse_pairs():
- for key, val in _cookie_parse_impl(header):
- key = to_unicode(key, charset, errors, allow_none_charset=True)
- if not key:
- continue
- val = to_unicode(val, charset, errors, allow_none_charset=True)
- yield try_coerce_native(key), val
- return cls(_parse_pairs())
- def dump_cookie(
- key,
- value="",
- max_age=None,
- expires=None,
- path="/",
- domain=None,
- secure=False,
- httponly=False,
- charset="utf-8",
- sync_expires=True,
- max_size=4093,
- samesite=None,
- ):
- """Creates a new Set-Cookie header without the ``Set-Cookie`` prefix
- The parameters are the same as in the cookie Morsel object in the
- Python standard library but it accepts unicode data, too.
- On Python 3 the return value of this function will be a unicode
- string, on Python 2 it will be a native string. In both cases the
- return value is usually restricted to ascii as the vast majority of
- values are properly escaped, but that is no guarantee. If a unicode
- string is returned it's tunneled through latin1 as required by
- PEP 3333.
- The return value is not ASCII safe if the key contains unicode
- characters. This is technically against the specification but
- happens in the wild. It's strongly recommended to not use
- non-ASCII values for the keys.
- :param max_age: should be a number of seconds, or `None` (default) if
- the cookie should last only as long as the client's
- browser session. Additionally `timedelta` objects
- are accepted, too.
- :param expires: should be a `datetime` object or unix timestamp.
- :param path: limits the cookie to a given path, per default it will
- span the whole domain.
- :param domain: Use this if you want to set a cross-domain cookie. For
- example, ``domain=".example.com"`` will set a cookie
- that is readable by the domain ``www.example.com``,
- ``foo.example.com`` etc. Otherwise, a cookie will only
- be readable by the domain that set it.
- :param secure: The cookie will only be available via HTTPS
- :param httponly: disallow JavaScript to access the cookie. This is an
- extension to the cookie standard and probably not
- supported by all browsers.
- :param charset: the encoding for unicode values.
- :param sync_expires: automatically set expires if max_age is defined
- but expires not.
- :param max_size: Warn if the final header value exceeds this size. The
- default, 4093, should be safely `supported by most browsers
- <cookie_>`_. Set to 0 to disable this check.
- :param samesite: Limits the scope of the cookie such that it will
- only be attached to requests if those requests are same-site.
- .. _`cookie`: http://browsercookielimits.squawky.net/
- .. versionchanged:: 1.0.0
- The string ``'None'`` is accepted for ``samesite``.
- """
- key = to_bytes(key, charset)
- value = to_bytes(value, charset)
- if path is not None:
- from .urls import iri_to_uri
- path = iri_to_uri(path, charset)
- domain = _make_cookie_domain(domain)
- if isinstance(max_age, timedelta):
- max_age = (max_age.days * 60 * 60 * 24) + max_age.seconds
- if expires is not None:
- if not isinstance(expires, string_types):
- expires = cookie_date(expires)
- elif max_age is not None and sync_expires:
- expires = to_bytes(cookie_date(time() + max_age))
- if samesite is not None:
- samesite = samesite.title()
- if samesite not in {"Strict", "Lax", "None"}:
- raise ValueError("SameSite must be 'Strict', 'Lax', or 'None'.")
- buf = [key + b"=" + _cookie_quote(value)]
- # XXX: In theory all of these parameters that are not marked with `None`
- # should be quoted. Because stdlib did not quote it before I did not
- # want to introduce quoting there now.
- for k, v, q in (
- (b"Domain", domain, True),
- (b"Expires", expires, False),
- (b"Max-Age", max_age, False),
- (b"Secure", secure, None),
- (b"HttpOnly", httponly, None),
- (b"Path", path, False),
- (b"SameSite", samesite, False),
- ):
- if q is None:
- if v:
- buf.append(k)
- continue
- if v is None:
- continue
- tmp = bytearray(k)
- if not isinstance(v, (bytes, bytearray)):
- v = to_bytes(text_type(v), charset)
- if q:
- v = _cookie_quote(v)
- tmp += b"=" + v
- buf.append(bytes(tmp))
- # The return value will be an incorrectly encoded latin1 header on
- # Python 3 for consistency with the headers object and a bytestring
- # on Python 2 because that's how the API makes more sense.
- rv = b"; ".join(buf)
- if not PY2:
- rv = rv.decode("latin1")
- # Warn if the final value of the cookie is larger than the limit. If the
- # cookie is too large, then it may be silently ignored by the browser,
- # which can be quite hard to debug.
- cookie_size = len(rv)
- if max_size and cookie_size > max_size:
- value_size = len(value)
- warnings.warn(
- 'The "{key}" cookie is too large: the value was {value_size} bytes'
- " but the header required {extra_size} extra bytes. The final size"
- " was {cookie_size} bytes but the limit is {max_size} bytes."
- " Browsers may silently ignore cookies larger than this.".format(
- key=key,
- value_size=value_size,
- extra_size=cookie_size - value_size,
- cookie_size=cookie_size,
- max_size=max_size,
- ),
- stacklevel=2,
- )
- return rv
- def is_byte_range_valid(start, stop, length):
- """Checks if a given byte content range is valid for the given length.
- .. versionadded:: 0.7
- """
- if (start is None) != (stop is None):
- return False
- elif start is None:
- return length is None or length >= 0
- elif length is None:
- return 0 <= start < stop
- elif start >= stop:
- return False
- return 0 <= start < length
- # circular dependencies
- from .datastructures import Accept
- from .datastructures import Authorization
- from .datastructures import ContentRange
- from .datastructures import ContentSecurityPolicy
- from .datastructures import ETags
- from .datastructures import HeaderSet
- from .datastructures import IfRange
- from .datastructures import MultiDict
- from .datastructures import Range
- from .datastructures import RequestCacheControl
- from .datastructures import WWWAuthenticate
|