123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155 |
- # -*- coding: utf-8 -*-
- """
- flask.helpers
- ~~~~~~~~~~~~~
- Implements various helpers.
- :copyright: 2010 Pallets
- :license: BSD-3-Clause
- """
- import io
- import mimetypes
- import os
- import pkgutil
- import posixpath
- import socket
- import sys
- import unicodedata
- from functools import update_wrapper
- from threading import RLock
- from time import time
- from zlib import adler32
- from jinja2 import FileSystemLoader
- from werkzeug.datastructures import Headers
- from werkzeug.exceptions import BadRequest
- from werkzeug.exceptions import NotFound
- from werkzeug.exceptions import RequestedRangeNotSatisfiable
- from werkzeug.routing import BuildError
- from werkzeug.urls import url_quote
- from werkzeug.wsgi import wrap_file
- from ._compat import fspath
- from ._compat import PY2
- from ._compat import string_types
- from ._compat import text_type
- from .globals import _app_ctx_stack
- from .globals import _request_ctx_stack
- from .globals import current_app
- from .globals import request
- from .globals import session
- from .signals import message_flashed
- # sentinel
- _missing = object()
- # what separators does this operating system provide that are not a slash?
- # this is used by the send_from_directory function to ensure that nobody is
- # able to access files from outside the filesystem.
- _os_alt_seps = list(
- sep for sep in [os.path.sep, os.path.altsep] if sep not in (None, "/")
- )
- def get_env():
- """Get the environment the app is running in, indicated by the
- :envvar:`FLASK_ENV` environment variable. The default is
- ``'production'``.
- """
- return os.environ.get("FLASK_ENV") or "production"
- def get_debug_flag():
- """Get whether debug mode should be enabled for the app, indicated
- by the :envvar:`FLASK_DEBUG` environment variable. The default is
- ``True`` if :func:`.get_env` returns ``'development'``, or ``False``
- otherwise.
- """
- val = os.environ.get("FLASK_DEBUG")
- if not val:
- return get_env() == "development"
- return val.lower() not in ("0", "false", "no")
- def get_load_dotenv(default=True):
- """Get whether the user has disabled loading dotenv files by setting
- :envvar:`FLASK_SKIP_DOTENV`. The default is ``True``, load the
- files.
- :param default: What to return if the env var isn't set.
- """
- val = os.environ.get("FLASK_SKIP_DOTENV")
- if not val:
- return default
- return val.lower() in ("0", "false", "no")
- def _endpoint_from_view_func(view_func):
- """Internal helper that returns the default endpoint for a given
- function. This always is the function name.
- """
- assert view_func is not None, "expected view func if endpoint is not provided."
- return view_func.__name__
- def stream_with_context(generator_or_function):
- """Request contexts disappear when the response is started on the server.
- This is done for efficiency reasons and to make it less likely to encounter
- memory leaks with badly written WSGI middlewares. The downside is that if
- you are using streamed responses, the generator cannot access request bound
- information any more.
- This function however can help you keep the context around for longer::
- from flask import stream_with_context, request, Response
- @app.route('/stream')
- def streamed_response():
- @stream_with_context
- def generate():
- yield 'Hello '
- yield request.args['name']
- yield '!'
- return Response(generate())
- Alternatively it can also be used around a specific generator::
- from flask import stream_with_context, request, Response
- @app.route('/stream')
- def streamed_response():
- def generate():
- yield 'Hello '
- yield request.args['name']
- yield '!'
- return Response(stream_with_context(generate()))
- .. versionadded:: 0.9
- """
- try:
- gen = iter(generator_or_function)
- except TypeError:
- def decorator(*args, **kwargs):
- gen = generator_or_function(*args, **kwargs)
- return stream_with_context(gen)
- return update_wrapper(decorator, generator_or_function)
- def generator():
- ctx = _request_ctx_stack.top
- if ctx is None:
- raise RuntimeError(
- "Attempted to stream with context but "
- "there was no context in the first place to keep around."
- )
- with ctx:
- # Dummy sentinel. Has to be inside the context block or we're
- # not actually keeping the context around.
- yield None
- # The try/finally is here so that if someone passes a WSGI level
- # iterator in we're still running the cleanup logic. Generators
- # don't need that because they are closed on their destruction
- # automatically.
- try:
- for item in gen:
- yield item
- finally:
- if hasattr(gen, "close"):
- gen.close()
- # The trick is to start the generator. Then the code execution runs until
- # the first dummy None is yielded at which point the context was already
- # pushed. This item is discarded. Then when the iteration continues the
- # real generator is executed.
- wrapped_g = generator()
- next(wrapped_g)
- return wrapped_g
- def make_response(*args):
- """Sometimes it is necessary to set additional headers in a view. Because
- views do not have to return response objects but can return a value that
- is converted into a response object by Flask itself, it becomes tricky to
- add headers to it. This function can be called instead of using a return
- and you will get a response object which you can use to attach headers.
- If view looked like this and you want to add a new header::
- def index():
- return render_template('index.html', foo=42)
- You can now do something like this::
- def index():
- response = make_response(render_template('index.html', foo=42))
- response.headers['X-Parachutes'] = 'parachutes are cool'
- return response
- This function accepts the very same arguments you can return from a
- view function. This for example creates a response with a 404 error
- code::
- response = make_response(render_template('not_found.html'), 404)
- The other use case of this function is to force the return value of a
- view function into a response which is helpful with view
- decorators::
- response = make_response(view_function())
- response.headers['X-Parachutes'] = 'parachutes are cool'
- Internally this function does the following things:
- - if no arguments are passed, it creates a new response argument
- - if one argument is passed, :meth:`flask.Flask.make_response`
- is invoked with it.
- - if more than one argument is passed, the arguments are passed
- to the :meth:`flask.Flask.make_response` function as tuple.
- .. versionadded:: 0.6
- """
- if not args:
- return current_app.response_class()
- if len(args) == 1:
- args = args[0]
- return current_app.make_response(args)
- def url_for(endpoint, **values):
- """Generates a URL to the given endpoint with the method provided.
- Variable arguments that are unknown to the target endpoint are appended
- to the generated URL as query arguments. If the value of a query argument
- is ``None``, the whole pair is skipped. In case blueprints are active
- you can shortcut references to the same blueprint by prefixing the
- local endpoint with a dot (``.``).
- This will reference the index function local to the current blueprint::
- url_for('.index')
- For more information, head over to the :ref:`Quickstart <url-building>`.
- Configuration values ``APPLICATION_ROOT`` and ``SERVER_NAME`` are only used when
- generating URLs outside of a request context.
- To integrate applications, :class:`Flask` has a hook to intercept URL build
- errors through :attr:`Flask.url_build_error_handlers`. The `url_for`
- function results in a :exc:`~werkzeug.routing.BuildError` when the current
- app does not have a URL for the given endpoint and values. When it does, the
- :data:`~flask.current_app` calls its :attr:`~Flask.url_build_error_handlers` if
- it is not ``None``, which can return a string to use as the result of
- `url_for` (instead of `url_for`'s default to raise the
- :exc:`~werkzeug.routing.BuildError` exception) or re-raise the exception.
- An example::
- def external_url_handler(error, endpoint, values):
- "Looks up an external URL when `url_for` cannot build a URL."
- # This is an example of hooking the build_error_handler.
- # Here, lookup_url is some utility function you've built
- # which looks up the endpoint in some external URL registry.
- url = lookup_url(endpoint, **values)
- if url is None:
- # External lookup did not have a URL.
- # Re-raise the BuildError, in context of original traceback.
- exc_type, exc_value, tb = sys.exc_info()
- if exc_value is error:
- raise exc_type, exc_value, tb
- else:
- raise error
- # url_for will use this result, instead of raising BuildError.
- return url
- app.url_build_error_handlers.append(external_url_handler)
- Here, `error` is the instance of :exc:`~werkzeug.routing.BuildError`, and
- `endpoint` and `values` are the arguments passed into `url_for`. Note
- that this is for building URLs outside the current application, and not for
- handling 404 NotFound errors.
- .. versionadded:: 0.10
- The `_scheme` parameter was added.
- .. versionadded:: 0.9
- The `_anchor` and `_method` parameters were added.
- .. versionadded:: 0.9
- Calls :meth:`Flask.handle_build_error` on
- :exc:`~werkzeug.routing.BuildError`.
- :param endpoint: the endpoint of the URL (name of the function)
- :param values: the variable arguments of the URL rule
- :param _external: if set to ``True``, an absolute URL is generated. Server
- address can be changed via ``SERVER_NAME`` configuration variable which
- falls back to the `Host` header, then to the IP and port of the request.
- :param _scheme: a string specifying the desired URL scheme. The `_external`
- parameter must be set to ``True`` or a :exc:`ValueError` is raised. The default
- behavior uses the same scheme as the current request, or
- ``PREFERRED_URL_SCHEME`` from the :ref:`app configuration <config>` if no
- request context is available. As of Werkzeug 0.10, this also can be set
- to an empty string to build protocol-relative URLs.
- :param _anchor: if provided this is added as anchor to the URL.
- :param _method: if provided this explicitly specifies an HTTP method.
- """
- appctx = _app_ctx_stack.top
- reqctx = _request_ctx_stack.top
- if appctx is None:
- raise RuntimeError(
- "Attempted to generate a URL without the application context being"
- " pushed. This has to be executed when application context is"
- " available."
- )
- # If request specific information is available we have some extra
- # features that support "relative" URLs.
- if reqctx is not None:
- url_adapter = reqctx.url_adapter
- blueprint_name = request.blueprint
- if endpoint[:1] == ".":
- if blueprint_name is not None:
- endpoint = blueprint_name + endpoint
- else:
- endpoint = endpoint[1:]
- external = values.pop("_external", False)
- # Otherwise go with the url adapter from the appctx and make
- # the URLs external by default.
- else:
- url_adapter = appctx.url_adapter
- if url_adapter is None:
- raise RuntimeError(
- "Application was not able to create a URL adapter for request"
- " independent URL generation. You might be able to fix this by"
- " setting the SERVER_NAME config variable."
- )
- external = values.pop("_external", True)
- anchor = values.pop("_anchor", None)
- method = values.pop("_method", None)
- scheme = values.pop("_scheme", None)
- appctx.app.inject_url_defaults(endpoint, values)
- # This is not the best way to deal with this but currently the
- # underlying Werkzeug router does not support overriding the scheme on
- # a per build call basis.
- old_scheme = None
- if scheme is not None:
- if not external:
- raise ValueError("When specifying _scheme, _external must be True")
- old_scheme = url_adapter.url_scheme
- url_adapter.url_scheme = scheme
- try:
- try:
- rv = url_adapter.build(
- endpoint, values, method=method, force_external=external
- )
- finally:
- if old_scheme is not None:
- url_adapter.url_scheme = old_scheme
- except BuildError as error:
- # We need to inject the values again so that the app callback can
- # deal with that sort of stuff.
- values["_external"] = external
- values["_anchor"] = anchor
- values["_method"] = method
- values["_scheme"] = scheme
- return appctx.app.handle_url_build_error(error, endpoint, values)
- if anchor is not None:
- rv += "#" + url_quote(anchor)
- return rv
- def get_template_attribute(template_name, attribute):
- """Loads a macro (or variable) a template exports. This can be used to
- invoke a macro from within Python code. If you for example have a
- template named :file:`_cider.html` with the following contents:
- .. sourcecode:: html+jinja
- {% macro hello(name) %}Hello {{ name }}!{% endmacro %}
- You can access this from Python code like this::
- hello = get_template_attribute('_cider.html', 'hello')
- return hello('World')
- .. versionadded:: 0.2
- :param template_name: the name of the template
- :param attribute: the name of the variable of macro to access
- """
- return getattr(current_app.jinja_env.get_template(template_name).module, attribute)
- def flash(message, category="message"):
- """Flashes a message to the next request. In order to remove the
- flashed message from the session and to display it to the user,
- the template has to call :func:`get_flashed_messages`.
- .. versionchanged:: 0.3
- `category` parameter added.
- :param message: the message to be flashed.
- :param category: the category for the message. The following values
- are recommended: ``'message'`` for any kind of message,
- ``'error'`` for errors, ``'info'`` for information
- messages and ``'warning'`` for warnings. However any
- kind of string can be used as category.
- """
- # Original implementation:
- #
- # session.setdefault('_flashes', []).append((category, message))
- #
- # This assumed that changes made to mutable structures in the session are
- # always in sync with the session object, which is not true for session
- # implementations that use external storage for keeping their keys/values.
- flashes = session.get("_flashes", [])
- flashes.append((category, message))
- session["_flashes"] = flashes
- message_flashed.send(
- current_app._get_current_object(), message=message, category=category
- )
- def get_flashed_messages(with_categories=False, category_filter=()):
- """Pulls all flashed messages from the session and returns them.
- Further calls in the same request to the function will return
- the same messages. By default just the messages are returned,
- but when `with_categories` is set to ``True``, the return value will
- be a list of tuples in the form ``(category, message)`` instead.
- Filter the flashed messages to one or more categories by providing those
- categories in `category_filter`. This allows rendering categories in
- separate html blocks. The `with_categories` and `category_filter`
- arguments are distinct:
- * `with_categories` controls whether categories are returned with message
- text (``True`` gives a tuple, where ``False`` gives just the message text).
- * `category_filter` filters the messages down to only those matching the
- provided categories.
- See :ref:`message-flashing-pattern` for examples.
- .. versionchanged:: 0.3
- `with_categories` parameter added.
- .. versionchanged:: 0.9
- `category_filter` parameter added.
- :param with_categories: set to ``True`` to also receive categories.
- :param category_filter: whitelist of categories to limit return values
- """
- flashes = _request_ctx_stack.top.flashes
- if flashes is None:
- _request_ctx_stack.top.flashes = flashes = (
- session.pop("_flashes") if "_flashes" in session else []
- )
- if category_filter:
- flashes = list(filter(lambda f: f[0] in category_filter, flashes))
- if not with_categories:
- return [x[1] for x in flashes]
- return flashes
- def send_file(
- filename_or_fp,
- mimetype=None,
- as_attachment=False,
- attachment_filename=None,
- add_etags=True,
- cache_timeout=None,
- conditional=False,
- last_modified=None,
- ):
- """Sends the contents of a file to the client. This will use the
- most efficient method available and configured. By default it will
- try to use the WSGI server's file_wrapper support. Alternatively
- you can set the application's :attr:`~Flask.use_x_sendfile` attribute
- to ``True`` to directly emit an ``X-Sendfile`` header. This however
- requires support of the underlying webserver for ``X-Sendfile``.
- By default it will try to guess the mimetype for you, but you can
- also explicitly provide one. For extra security you probably want
- to send certain files as attachment (HTML for instance). The mimetype
- guessing requires a `filename` or an `attachment_filename` to be
- provided.
- ETags will also be attached automatically if a `filename` is provided. You
- can turn this off by setting `add_etags=False`.
- If `conditional=True` and `filename` is provided, this method will try to
- upgrade the response stream to support range requests. This will allow
- the request to be answered with partial content response.
- Please never pass filenames to this function from user sources;
- you should use :func:`send_from_directory` instead.
- .. versionadded:: 0.2
- .. versionadded:: 0.5
- The `add_etags`, `cache_timeout` and `conditional` parameters were
- added. The default behavior is now to attach etags.
- .. versionchanged:: 0.7
- mimetype guessing and etag support for file objects was
- deprecated because it was unreliable. Pass a filename if you are
- able to, otherwise attach an etag yourself. This functionality
- will be removed in Flask 1.0
- .. versionchanged:: 0.9
- cache_timeout pulls its default from application config, when None.
- .. versionchanged:: 0.12
- The filename is no longer automatically inferred from file objects. If
- you want to use automatic mimetype and etag support, pass a filepath via
- `filename_or_fp` or `attachment_filename`.
- .. versionchanged:: 0.12
- The `attachment_filename` is preferred over `filename` for MIME-type
- detection.
- .. versionchanged:: 1.0
- UTF-8 filenames, as specified in `RFC 2231`_, are supported.
- .. _RFC 2231: https://tools.ietf.org/html/rfc2231#section-4
- .. versionchanged:: 1.0.3
- Filenames are encoded with ASCII instead of Latin-1 for broader
- compatibility with WSGI servers.
- .. versionchanged:: 1.1
- Filename may be a :class:`~os.PathLike` object.
- .. versionadded:: 1.1
- Partial content supports :class:`~io.BytesIO`.
- :param filename_or_fp: the filename of the file to send.
- This is relative to the :attr:`~Flask.root_path`
- if a relative path is specified.
- Alternatively a file object might be provided in
- which case ``X-Sendfile`` might not work and fall
- back to the traditional method. Make sure that the
- file pointer is positioned at the start of data to
- send before calling :func:`send_file`.
- :param mimetype: the mimetype of the file if provided. If a file path is
- given, auto detection happens as fallback, otherwise an
- error will be raised.
- :param as_attachment: set to ``True`` if you want to send this file with
- a ``Content-Disposition: attachment`` header.
- :param attachment_filename: the filename for the attachment if it
- differs from the file's filename.
- :param add_etags: set to ``False`` to disable attaching of etags.
- :param conditional: set to ``True`` to enable conditional responses.
- :param cache_timeout: the timeout in seconds for the headers. When ``None``
- (default), this value is set by
- :meth:`~Flask.get_send_file_max_age` of
- :data:`~flask.current_app`.
- :param last_modified: set the ``Last-Modified`` header to this value,
- a :class:`~datetime.datetime` or timestamp.
- If a file was passed, this overrides its mtime.
- """
- mtime = None
- fsize = None
- if hasattr(filename_or_fp, "__fspath__"):
- filename_or_fp = fspath(filename_or_fp)
- if isinstance(filename_or_fp, string_types):
- filename = filename_or_fp
- if not os.path.isabs(filename):
- filename = os.path.join(current_app.root_path, filename)
- file = None
- if attachment_filename is None:
- attachment_filename = os.path.basename(filename)
- else:
- file = filename_or_fp
- filename = None
- if mimetype is None:
- if attachment_filename is not None:
- mimetype = (
- mimetypes.guess_type(attachment_filename)[0]
- or "application/octet-stream"
- )
- if mimetype is None:
- raise ValueError(
- "Unable to infer MIME-type because no filename is available. "
- "Please set either `attachment_filename`, pass a filepath to "
- "`filename_or_fp` or set your own MIME-type via `mimetype`."
- )
- headers = Headers()
- if as_attachment:
- if attachment_filename is None:
- raise TypeError("filename unavailable, required for sending as attachment")
- if not isinstance(attachment_filename, text_type):
- attachment_filename = attachment_filename.decode("utf-8")
- try:
- attachment_filename = attachment_filename.encode("ascii")
- except UnicodeEncodeError:
- filenames = {
- "filename": unicodedata.normalize("NFKD", attachment_filename).encode(
- "ascii", "ignore"
- ),
- "filename*": "UTF-8''%s" % url_quote(attachment_filename, safe=b""),
- }
- else:
- filenames = {"filename": attachment_filename}
- headers.add("Content-Disposition", "attachment", **filenames)
- if current_app.use_x_sendfile and filename:
- if file is not None:
- file.close()
- headers["X-Sendfile"] = filename
- fsize = os.path.getsize(filename)
- headers["Content-Length"] = fsize
- data = None
- else:
- if file is None:
- file = open(filename, "rb")
- mtime = os.path.getmtime(filename)
- fsize = os.path.getsize(filename)
- headers["Content-Length"] = fsize
- elif isinstance(file, io.BytesIO):
- try:
- fsize = file.getbuffer().nbytes
- except AttributeError:
- # Python 2 doesn't have getbuffer
- fsize = len(file.getvalue())
- headers["Content-Length"] = fsize
- data = wrap_file(request.environ, file)
- rv = current_app.response_class(
- data, mimetype=mimetype, headers=headers, direct_passthrough=True
- )
- if last_modified is not None:
- rv.last_modified = last_modified
- elif mtime is not None:
- rv.last_modified = mtime
- rv.cache_control.public = True
- if cache_timeout is None:
- cache_timeout = current_app.get_send_file_max_age(filename)
- if cache_timeout is not None:
- rv.cache_control.max_age = cache_timeout
- rv.expires = int(time() + cache_timeout)
- if add_etags and filename is not None:
- from warnings import warn
- try:
- rv.set_etag(
- "%s-%s-%s"
- % (
- os.path.getmtime(filename),
- os.path.getsize(filename),
- adler32(
- filename.encode("utf-8")
- if isinstance(filename, text_type)
- else filename
- )
- & 0xFFFFFFFF,
- )
- )
- except OSError:
- warn(
- "Access %s failed, maybe it does not exist, so ignore etags in "
- "headers" % filename,
- stacklevel=2,
- )
- if conditional:
- try:
- rv = rv.make_conditional(request, accept_ranges=True, complete_length=fsize)
- except RequestedRangeNotSatisfiable:
- if file is not None:
- file.close()
- raise
- # make sure we don't send x-sendfile for servers that
- # ignore the 304 status code for x-sendfile.
- if rv.status_code == 304:
- rv.headers.pop("x-sendfile", None)
- return rv
- def safe_join(directory, *pathnames):
- """Safely join `directory` and zero or more untrusted `pathnames`
- components.
- Example usage::
- @app.route('/wiki/<path:filename>')
- def wiki_page(filename):
- filename = safe_join(app.config['WIKI_FOLDER'], filename)
- with open(filename, 'rb') as fd:
- content = fd.read() # Read and process the file content...
- :param directory: the trusted base directory.
- :param pathnames: the untrusted pathnames relative to that directory.
- :raises: :class:`~werkzeug.exceptions.NotFound` if one or more passed
- paths fall out of its boundaries.
- """
- parts = [directory]
- for filename in pathnames:
- if filename != "":
- filename = posixpath.normpath(filename)
- if (
- any(sep in filename for sep in _os_alt_seps)
- or os.path.isabs(filename)
- or filename == ".."
- or filename.startswith("../")
- ):
- raise NotFound()
- parts.append(filename)
- return posixpath.join(*parts)
- def send_from_directory(directory, filename, **options):
- """Send a file from a given directory with :func:`send_file`. This
- is a secure way to quickly expose static files from an upload folder
- or something similar.
- Example usage::
- @app.route('/uploads/<path:filename>')
- def download_file(filename):
- return send_from_directory(app.config['UPLOAD_FOLDER'],
- filename, as_attachment=True)
- .. admonition:: Sending files and Performance
- It is strongly recommended to activate either ``X-Sendfile`` support in
- your webserver or (if no authentication happens) to tell the webserver
- to serve files for the given path on its own without calling into the
- web application for improved performance.
- .. versionadded:: 0.5
- :param directory: the directory where all the files are stored.
- :param filename: the filename relative to that directory to
- download.
- :param options: optional keyword arguments that are directly
- forwarded to :func:`send_file`.
- """
- filename = fspath(filename)
- directory = fspath(directory)
- filename = safe_join(directory, filename)
- if not os.path.isabs(filename):
- filename = os.path.join(current_app.root_path, filename)
- try:
- if not os.path.isfile(filename):
- raise NotFound()
- except (TypeError, ValueError):
- raise BadRequest()
- options.setdefault("conditional", True)
- return send_file(filename, **options)
- def get_root_path(import_name):
- """Returns the path to a package or cwd if that cannot be found. This
- returns the path of a package or the folder that contains a module.
- Not to be confused with the package path returned by :func:`find_package`.
- """
- # Module already imported and has a file attribute. Use that first.
- mod = sys.modules.get(import_name)
- if mod is not None and hasattr(mod, "__file__"):
- return os.path.dirname(os.path.abspath(mod.__file__))
- # Next attempt: check the loader.
- loader = pkgutil.get_loader(import_name)
- # Loader does not exist or we're referring to an unloaded main module
- # or a main module without path (interactive sessions), go with the
- # current working directory.
- if loader is None or import_name == "__main__":
- return os.getcwd()
- # For .egg, zipimporter does not have get_filename until Python 2.7.
- # Some other loaders might exhibit the same behavior.
- if hasattr(loader, "get_filename"):
- filepath = loader.get_filename(import_name)
- else:
- # Fall back to imports.
- __import__(import_name)
- mod = sys.modules[import_name]
- filepath = getattr(mod, "__file__", None)
- # If we don't have a filepath it might be because we are a
- # namespace package. In this case we pick the root path from the
- # first module that is contained in our package.
- if filepath is None:
- raise RuntimeError(
- "No root path can be found for the provided "
- 'module "%s". This can happen because the '
- "module came from an import hook that does "
- "not provide file name information or because "
- "it's a namespace package. In this case "
- "the root path needs to be explicitly "
- "provided." % import_name
- )
- # filepath is import_name.py for a module, or __init__.py for a package.
- return os.path.dirname(os.path.abspath(filepath))
- def _matching_loader_thinks_module_is_package(loader, mod_name):
- """Given the loader that loaded a module and the module this function
- attempts to figure out if the given module is actually a package.
- """
- # If the loader can tell us if something is a package, we can
- # directly ask the loader.
- if hasattr(loader, "is_package"):
- return loader.is_package(mod_name)
- # importlib's namespace loaders do not have this functionality but
- # all the modules it loads are packages, so we can take advantage of
- # this information.
- elif (
- loader.__class__.__module__ == "_frozen_importlib"
- and loader.__class__.__name__ == "NamespaceLoader"
- ):
- return True
- # Otherwise we need to fail with an error that explains what went
- # wrong.
- raise AttributeError(
- (
- "%s.is_package() method is missing but is required by Flask of "
- "PEP 302 import hooks. If you do not use import hooks and "
- "you encounter this error please file a bug against Flask."
- )
- % loader.__class__.__name__
- )
- def _find_package_path(root_mod_name):
- """Find the path where the module's root exists in"""
- if sys.version_info >= (3, 4):
- import importlib.util
- try:
- spec = importlib.util.find_spec(root_mod_name)
- if spec is None:
- raise ValueError("not found")
- # ImportError: the machinery told us it does not exist
- # ValueError:
- # - the module name was invalid
- # - the module name is __main__
- # - *we* raised `ValueError` due to `spec` being `None`
- except (ImportError, ValueError):
- pass # handled below
- else:
- # namespace package
- if spec.origin in {"namespace", None}:
- return os.path.dirname(next(iter(spec.submodule_search_locations)))
- # a package (with __init__.py)
- elif spec.submodule_search_locations:
- return os.path.dirname(os.path.dirname(spec.origin))
- # just a normal module
- else:
- return os.path.dirname(spec.origin)
- # we were unable to find the `package_path` using PEP 451 loaders
- loader = pkgutil.get_loader(root_mod_name)
- if loader is None or root_mod_name == "__main__":
- # import name is not found, or interactive/main module
- return os.getcwd()
- else:
- # For .egg, zipimporter does not have get_filename until Python 2.7.
- if hasattr(loader, "get_filename"):
- filename = loader.get_filename(root_mod_name)
- elif hasattr(loader, "archive"):
- # zipimporter's loader.archive points to the .egg or .zip
- # archive filename is dropped in call to dirname below.
- filename = loader.archive
- else:
- # At least one loader is missing both get_filename and archive:
- # Google App Engine's HardenedModulesHook
- #
- # Fall back to imports.
- __import__(root_mod_name)
- filename = sys.modules[root_mod_name].__file__
- package_path = os.path.abspath(os.path.dirname(filename))
- # In case the root module is a package we need to chop of the
- # rightmost part. This needs to go through a helper function
- # because of python 3.3 namespace packages.
- if _matching_loader_thinks_module_is_package(loader, root_mod_name):
- package_path = os.path.dirname(package_path)
- return package_path
- def find_package(import_name):
- """Finds a package and returns the prefix (or None if the package is
- not installed) as well as the folder that contains the package or
- module as a tuple. The package path returned is the module that would
- have to be added to the pythonpath in order to make it possible to
- import the module. The prefix is the path below which a UNIX like
- folder structure exists (lib, share etc.).
- """
- root_mod_name, _, _ = import_name.partition(".")
- package_path = _find_package_path(root_mod_name)
- site_parent, site_folder = os.path.split(package_path)
- py_prefix = os.path.abspath(sys.prefix)
- if package_path.startswith(py_prefix):
- return py_prefix, package_path
- elif site_folder.lower() == "site-packages":
- parent, folder = os.path.split(site_parent)
- # Windows like installations
- if folder.lower() == "lib":
- base_dir = parent
- # UNIX like installations
- elif os.path.basename(parent).lower() == "lib":
- base_dir = os.path.dirname(parent)
- else:
- base_dir = site_parent
- return base_dir, package_path
- return None, package_path
- class locked_cached_property(object):
- """A decorator that converts a function into a lazy property. The
- function wrapped is called the first time to retrieve the result
- and then that calculated result is used the next time you access
- the value. Works like the one in Werkzeug but has a lock for
- thread safety.
- """
- def __init__(self, func, name=None, doc=None):
- self.__name__ = name or func.__name__
- self.__module__ = func.__module__
- self.__doc__ = doc or func.__doc__
- self.func = func
- self.lock = RLock()
- def __get__(self, obj, type=None):
- if obj is None:
- return self
- with self.lock:
- value = obj.__dict__.get(self.__name__, _missing)
- if value is _missing:
- value = self.func(obj)
- obj.__dict__[self.__name__] = value
- return value
- class _PackageBoundObject(object):
- #: The name of the package or module that this app belongs to. Do not
- #: change this once it is set by the constructor.
- import_name = None
- #: Location of the template files to be added to the template lookup.
- #: ``None`` if templates should not be added.
- template_folder = None
- #: Absolute path to the package on the filesystem. Used to look up
- #: resources contained in the package.
- root_path = None
- def __init__(self, import_name, template_folder=None, root_path=None):
- self.import_name = import_name
- self.template_folder = template_folder
- if root_path is None:
- root_path = get_root_path(self.import_name)
- self.root_path = root_path
- self._static_folder = None
- self._static_url_path = None
- # circular import
- from .cli import AppGroup
- #: The Click command group for registration of CLI commands
- #: on the application and associated blueprints. These commands
- #: are accessible via the :command:`flask` command once the
- #: application has been discovered and blueprints registered.
- self.cli = AppGroup()
- @property
- def static_folder(self):
- """The absolute path to the configured static folder."""
- if self._static_folder is not None:
- return os.path.join(self.root_path, self._static_folder)
- @static_folder.setter
- def static_folder(self, value):
- if value is not None:
- value = value.rstrip("/\\")
- self._static_folder = value
- @property
- def static_url_path(self):
- """The URL prefix that the static route will be accessible from.
- If it was not configured during init, it is derived from
- :attr:`static_folder`.
- """
- if self._static_url_path is not None:
- return self._static_url_path
- if self.static_folder is not None:
- basename = os.path.basename(self.static_folder)
- return ("/" + basename).rstrip("/")
- @static_url_path.setter
- def static_url_path(self, value):
- if value is not None:
- value = value.rstrip("/")
- self._static_url_path = value
- @property
- def has_static_folder(self):
- """This is ``True`` if the package bound object's container has a
- folder for static files.
- .. versionadded:: 0.5
- """
- return self.static_folder is not None
- @locked_cached_property
- def jinja_loader(self):
- """The Jinja loader for this package bound object.
- .. versionadded:: 0.5
- """
- if self.template_folder is not None:
- return FileSystemLoader(os.path.join(self.root_path, self.template_folder))
- def get_send_file_max_age(self, filename):
- """Provides default cache_timeout for the :func:`send_file` functions.
- By default, this function returns ``SEND_FILE_MAX_AGE_DEFAULT`` from
- the configuration of :data:`~flask.current_app`.
- Static file functions such as :func:`send_from_directory` use this
- function, and :func:`send_file` calls this function on
- :data:`~flask.current_app` when the given cache_timeout is ``None``. If a
- cache_timeout is given in :func:`send_file`, that timeout is used;
- otherwise, this method is called.
- This allows subclasses to change the behavior when sending files based
- on the filename. For example, to set the cache timeout for .js files
- to 60 seconds::
- class MyFlask(flask.Flask):
- def get_send_file_max_age(self, name):
- if name.lower().endswith('.js'):
- return 60
- return flask.Flask.get_send_file_max_age(self, name)
- .. versionadded:: 0.9
- """
- return total_seconds(current_app.send_file_max_age_default)
- def send_static_file(self, filename):
- """Function used internally to send static files from the static
- folder to the browser.
- .. versionadded:: 0.5
- """
- if not self.has_static_folder:
- raise RuntimeError("No static folder for this object")
- # Ensure get_send_file_max_age is called in all cases.
- # Here, we ensure get_send_file_max_age is called for Blueprints.
- cache_timeout = self.get_send_file_max_age(filename)
- return send_from_directory(
- self.static_folder, filename, cache_timeout=cache_timeout
- )
- def open_resource(self, resource, mode="rb"):
- """Opens a resource from the application's resource folder. To see
- how this works, consider the following folder structure::
- /myapplication.py
- /schema.sql
- /static
- /style.css
- /templates
- /layout.html
- /index.html
- If you want to open the :file:`schema.sql` file you would do the
- following::
- with app.open_resource('schema.sql') as f:
- contents = f.read()
- do_something_with(contents)
- :param resource: the name of the resource. To access resources within
- subfolders use forward slashes as separator.
- :param mode: Open file in this mode. Only reading is supported,
- valid values are "r" (or "rt") and "rb".
- """
- if mode not in {"r", "rt", "rb"}:
- raise ValueError("Resources can only be opened for reading")
- return open(os.path.join(self.root_path, resource), mode)
- def total_seconds(td):
- """Returns the total seconds from a timedelta object.
- :param timedelta td: the timedelta to be converted in seconds
- :returns: number of seconds
- :rtype: int
- """
- return td.days * 60 * 60 * 24 + td.seconds
- def is_ip(value):
- """Determine if the given string is an IP address.
- Python 2 on Windows doesn't provide ``inet_pton``, so this only
- checks IPv4 addresses in that environment.
- :param value: value to check
- :type value: str
- :return: True if string is an IP address
- :rtype: bool
- """
- if PY2 and os.name == "nt":
- try:
- socket.inet_aton(value)
- return True
- except socket.error:
- return False
- for family in (socket.AF_INET, socket.AF_INET6):
- try:
- socket.inet_pton(family, value)
- except socket.error:
- pass
- else:
- return True
- return False
|