Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/environ/environ.py: 41%
450 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# This file is part of the django-environ.
2#
3# Copyright (c) 2021-2022, Serghei Iakovlev <egrep@protonmail.ch>
4# Copyright (c) 2013-2021, Daniele Faraglia <daniele.faraglia@gmail.com>
5#
6# For the full copyright and license information, please view
7# the LICENSE.txt file that was distributed with this source code.
9"""
10Django-environ allows you to utilize 12factor inspired environment
11variables to configure your Django application.
12"""
14import ast
15import itertools
16import logging
17import os
18import re
19import sys
20import urllib.parse as urlparselib
21import warnings
22from urllib.parse import (
23 parse_qs,
24 ParseResult,
25 unquote,
26 unquote_plus,
27 urlparse,
28 urlunparse,
29)
31from .compat import (
32 DJANGO_POSTGRES,
33 ImproperlyConfigured,
34 json,
35 PYMEMCACHE_DRIVER,
36 REDIS_DRIVER,
37)
38from .fileaware_mapping import FileAwareMapping
40try:
41 from os import PathLike
42except ImportError: # Python 3.5 support
43 from pathlib import PurePath as PathLike
45Openable = (str, PathLike)
47logger = logging.getLogger(__name__)
50def _cast(value):
51 # Safely evaluate an expression node or a string containing a Python
52 # literal or container display.
53 # https://docs.python.org/3/library/ast.html#ast.literal_eval
54 try:
55 return ast.literal_eval(value)
56 except (ValueError, SyntaxError):
57 return value
60def _cast_int(v):
61 """Return int if possible."""
62 return int(v) if hasattr(v, 'isdigit') and v.isdigit() else v
65def _cast_urlstr(v):
66 return unquote(v) if isinstance(v, str) else v
69class NoValue:
71 def __repr__(self):
72 return '<{}>'.format(self.__class__.__name__)
75class Env:
76 """Provide scheme-based lookups of environment variables so that each
77 caller doesn't have to pass in ``cast`` and ``default`` parameters.
79 Usage:::
81 import environ
82 import os
84 env = environ.Env(
85 # set casting, default value
86 MAIL_ENABLED=(bool, False),
87 SMTP_LOGIN=(str, 'DEFAULT')
88 )
90 # Set the project base directory
91 BASE_DIR = os.path.dirname(
92 os.path.dirname(os.path.abspath(__file__))
93 )
95 # Take environment variables from .env file
96 environ.Env.read_env(os.path.join(BASE_DIR, '.env'))
98 # False if not in os.environ due to casting above
99 MAIL_ENABLED = env('MAIL_ENABLED')
101 # 'DEFAULT' if not in os.environ due to casting above
102 SMTP_LOGIN = env('SMTP_LOGIN')
103 """
105 ENVIRON = os.environ
106 NOTSET = NoValue()
107 BOOLEAN_TRUE_STRINGS = ('true', 'on', 'ok', 'y', 'yes', '1')
108 URL_CLASS = ParseResult
110 POSTGRES_FAMILY = ['postgres', 'postgresql', 'psql', 'pgsql', 'postgis']
111 ELASTICSEARCH_FAMILY = ['elasticsearch' + x for x in ['', '2', '5', '7']]
113 DEFAULT_DATABASE_ENV = 'DATABASE_URL'
114 DB_SCHEMES = {
115 'postgres': DJANGO_POSTGRES,
116 'postgresql': DJANGO_POSTGRES,
117 'psql': DJANGO_POSTGRES,
118 'pgsql': DJANGO_POSTGRES,
119 'postgis': 'django.contrib.gis.db.backends.postgis',
120 'mysql': 'django.db.backends.mysql',
121 'mysql2': 'django.db.backends.mysql',
122 'mysql-connector': 'mysql.connector.django',
123 'mysqlgis': 'django.contrib.gis.db.backends.mysql',
124 'mssql': 'sql_server.pyodbc',
125 'oracle': 'django.db.backends.oracle',
126 'pyodbc': 'sql_server.pyodbc',
127 'redshift': 'django_redshift_backend',
128 'spatialite': 'django.contrib.gis.db.backends.spatialite',
129 'sqlite': 'django.db.backends.sqlite3',
130 'ldap': 'ldapdb.backends.ldap',
131 }
132 _DB_BASE_OPTIONS = [
133 'CONN_MAX_AGE',
134 'ATOMIC_REQUESTS',
135 'AUTOCOMMIT',
136 'DISABLE_SERVER_SIDE_CURSORS',
137 ]
139 DEFAULT_CACHE_ENV = 'CACHE_URL'
140 CACHE_SCHEMES = {
141 'dbcache': 'django.core.cache.backends.db.DatabaseCache',
142 'dummycache': 'django.core.cache.backends.dummy.DummyCache',
143 'filecache': 'django.core.cache.backends.filebased.FileBasedCache',
144 'locmemcache': 'django.core.cache.backends.locmem.LocMemCache',
145 'memcache': 'django.core.cache.backends.memcached.MemcachedCache',
146 'pymemcache': PYMEMCACHE_DRIVER,
147 'pylibmc': 'django.core.cache.backends.memcached.PyLibMCCache',
148 'rediscache': REDIS_DRIVER,
149 'redis': REDIS_DRIVER,
150 'rediss': REDIS_DRIVER,
151 }
152 _CACHE_BASE_OPTIONS = [
153 'TIMEOUT',
154 'KEY_PREFIX',
155 'VERSION',
156 'KEY_FUNCTION',
157 'BINARY',
158 ]
160 DEFAULT_EMAIL_ENV = 'EMAIL_URL'
161 EMAIL_SCHEMES = {
162 'smtp': 'django.core.mail.backends.smtp.EmailBackend',
163 'smtps': 'django.core.mail.backends.smtp.EmailBackend',
164 'smtp+tls': 'django.core.mail.backends.smtp.EmailBackend',
165 'smtp+ssl': 'django.core.mail.backends.smtp.EmailBackend',
166 'consolemail': 'django.core.mail.backends.console.EmailBackend',
167 'filemail': 'django.core.mail.backends.filebased.EmailBackend',
168 'memorymail': 'django.core.mail.backends.locmem.EmailBackend',
169 'dummymail': 'django.core.mail.backends.dummy.EmailBackend'
170 }
171 _EMAIL_BASE_OPTIONS = ['EMAIL_USE_TLS', 'EMAIL_USE_SSL']
173 DEFAULT_SEARCH_ENV = 'SEARCH_URL'
174 SEARCH_SCHEMES = {
175 "elasticsearch": "haystack.backends.elasticsearch_backend."
176 "ElasticsearchSearchEngine",
177 "elasticsearch2": "haystack.backends.elasticsearch2_backend."
178 "Elasticsearch2SearchEngine",
179 "elasticsearch5": "haystack.backends.elasticsearch5_backend."
180 "Elasticsearch5SearchEngine",
181 "elasticsearch7": "haystack.backends.elasticsearch7_backend."
182 "Elasticsearch7SearchEngine",
183 "solr": "haystack.backends.solr_backend.SolrEngine",
184 "whoosh": "haystack.backends.whoosh_backend.WhooshEngine",
185 "xapian": "haystack.backends.xapian_backend.XapianEngine",
186 "simple": "haystack.backends.simple_backend.SimpleEngine",
187 }
188 CLOUDSQL = 'cloudsql'
190 def __init__(self, **scheme):
191 self.smart_cast = True
192 self.escape_proxy = False
193 self.prefix = ""
194 self.scheme = scheme
196 def __call__(self, var, cast=None, default=NOTSET, parse_default=False):
197 return self.get_value(
198 var,
199 cast=cast,
200 default=default,
201 parse_default=parse_default
202 )
204 def __contains__(self, var):
205 return var in self.ENVIRON
207 # Shortcuts
209 def str(self, var, default=NOTSET, multiline=False):
210 """
211 :rtype: str
212 """
213 value = self.get_value(var, cast=str, default=default)
214 if multiline: 214 ↛ 215line 214 didn't jump to line 215, because the condition on line 214 was never true
215 return re.sub(r'(\\r)?\\n', r'\n', value)
216 return value
218 def unicode(self, var, default=NOTSET):
219 """Helper for python2
220 :rtype: unicode
221 """
222 warnings.warn(
223 '`%s.unicode` is deprecated, use `%s.str` instead' % (
224 self.__class__.__name__,
225 self.__class__.__name__,
226 ),
227 DeprecationWarning,
228 stacklevel=2
229 )
231 return self.get_value(var, cast=str, default=default)
233 def bytes(self, var, default=NOTSET, encoding='utf8'):
234 """
235 :rtype: bytes
236 """
237 value = self.get_value(var, cast=str, default=default)
238 if hasattr(value, 'encode'):
239 return value.encode(encoding)
240 return value
242 def bool(self, var, default=NOTSET):
243 """
244 :rtype: bool
245 """
246 return self.get_value(var, cast=bool, default=default)
248 def int(self, var, default=NOTSET):
249 """
250 :rtype: int
251 """
252 return self.get_value(var, cast=int, default=default)
254 def float(self, var, default=NOTSET):
255 """
256 :rtype: float
257 """
258 return self.get_value(var, cast=float, default=default)
260 def json(self, var, default=NOTSET):
261 """
262 :returns: Json parsed
263 """
264 return self.get_value(var, cast=json.loads, default=default)
266 def list(self, var, cast=None, default=NOTSET):
267 """
268 :rtype: list
269 """
270 return self.get_value(
271 var,
272 cast=list if not cast else [cast],
273 default=default
274 )
276 def tuple(self, var, cast=None, default=NOTSET):
277 """
278 :rtype: tuple
279 """
280 return self.get_value(
281 var,
282 cast=tuple if not cast else (cast,),
283 default=default
284 )
286 def dict(self, var, cast=dict, default=NOTSET):
287 """
288 :rtype: dict
289 """
290 return self.get_value(var, cast=cast, default=default)
292 def url(self, var, default=NOTSET):
293 """
294 :rtype: urllib.parse.ParseResult
295 """
296 return self.get_value(
297 var,
298 cast=urlparse,
299 default=default,
300 parse_default=True
301 )
303 def db_url(self, var=DEFAULT_DATABASE_ENV, default=NOTSET, engine=None):
304 """Returns a config dictionary, defaulting to DATABASE_URL.
306 The db method is an alias for db_url.
308 :rtype: dict
309 """
310 return self.db_url_config(
311 self.get_value(var, default=default),
312 engine=engine
313 )
315 db = db_url
317 def cache_url(self, var=DEFAULT_CACHE_ENV, default=NOTSET, backend=None):
318 """Returns a config dictionary, defaulting to CACHE_URL.
320 The cache method is an alias for cache_url.
322 :rtype: dict
323 """
324 return self.cache_url_config(
325 self.url(var, default=default),
326 backend=backend
327 )
329 cache = cache_url
331 def email_url(self, var=DEFAULT_EMAIL_ENV, default=NOTSET, backend=None):
332 """Returns a config dictionary, defaulting to EMAIL_URL.
334 The email method is an alias for email_url.
336 :rtype: dict
337 """
338 return self.email_url_config(
339 self.url(var, default=default),
340 backend=backend
341 )
343 email = email_url
345 def search_url(self, var=DEFAULT_SEARCH_ENV, default=NOTSET, engine=None):
346 """Returns a config dictionary, defaulting to SEARCH_URL.
348 :rtype: dict
349 """
350 return self.search_url_config(
351 self.url(var, default=default),
352 engine=engine
353 )
355 def path(self, var, default=NOTSET, **kwargs):
356 """
357 :rtype: Path
358 """
359 return Path(self.get_value(var, default=default), **kwargs)
361 def get_value(self, var, cast=None, default=NOTSET, parse_default=False):
362 """Return value for given environment variable.
364 :param str var:
365 Name of variable.
366 :param collections.abc.Callable or None cast:
367 Type to cast return value as.
368 :param default:
369 If var not present in environ, return this instead.
370 :param bool parse_default:
371 Force to parse default.
372 :returns: Value from environment or default (if set).
373 :rtype: typing.IO[typing.Any]
374 """
376 logger.debug("get '{}' casted as '{}' with default '{}'".format(
377 var, cast, default
378 ))
380 var_name = "{}{}".format(self.prefix, var)
381 if var_name in self.scheme: 381 ↛ 382line 381 didn't jump to line 382, because the condition on line 381 was never true
382 var_info = self.scheme[var_name]
384 try:
385 has_default = len(var_info) == 2
386 except TypeError:
387 has_default = False
389 if has_default:
390 if not cast:
391 cast = var_info[0]
393 if default is self.NOTSET:
394 try:
395 default = var_info[1]
396 except IndexError:
397 pass
398 else:
399 if not cast:
400 cast = var_info
402 try:
403 value = self.ENVIRON[var_name]
404 except KeyError as exc:
405 if default is self.NOTSET: 405 ↛ 406line 405 didn't jump to line 406, because the condition on line 405 was never true
406 error_msg = "Set the {} environment variable".format(var)
407 raise ImproperlyConfigured(error_msg) from exc
409 value = default
411 # Resolve any proxied values
412 prefix = b'$' if isinstance(value, bytes) else '$'
413 escape = rb'\$' if isinstance(value, bytes) else r'\$'
414 if hasattr(value, 'startswith') and value.startswith(prefix): 414 ↛ 415line 414 didn't jump to line 415, because the condition on line 414 was never true
415 value = value.lstrip(prefix)
416 value = self.get_value(value, cast=cast, default=default)
418 if self.escape_proxy and hasattr(value, 'replace'): 418 ↛ 419line 418 didn't jump to line 419, because the condition on line 418 was never true
419 value = value.replace(escape, prefix)
421 # Smart casting
422 if self.smart_cast: 422 ↛ 427line 422 didn't jump to line 427, because the condition on line 422 was never false
423 if cast is None and default is not None and \
424 not isinstance(default, NoValue):
425 cast = type(default)
427 value = None if default is None and value == '' else value
429 if value != default or (parse_default and value):
430 value = self.parse_value(value, cast)
432 return value
434 # Class and static methods
436 @classmethod
437 def parse_value(cls, value, cast):
438 """Parse and cast provided value
440 :param value: Stringed value.
441 :param cast: Type to cast return value as.
443 :returns: Casted value
444 """
445 if cast is None:
446 return value
447 elif cast is bool:
448 try:
449 value = int(value) != 0
450 except ValueError:
451 value = value.lower() in cls.BOOLEAN_TRUE_STRINGS
452 elif isinstance(cast, list): 452 ↛ 453line 452 didn't jump to line 453, because the condition on line 452 was never true
453 value = list(map(cast[0], [x for x in value.split(',') if x]))
454 elif isinstance(cast, tuple): 454 ↛ 455line 454 didn't jump to line 455, because the condition on line 454 was never true
455 val = value.strip('(').strip(')').split(',')
456 value = tuple(map(cast[0], [x for x in val if x]))
457 elif isinstance(cast, dict): 457 ↛ 458line 457 didn't jump to line 458, because the condition on line 457 was never true
458 key_cast = cast.get('key', str)
459 value_cast = cast.get('value', str)
460 value_cast_by_key = cast.get('cast', dict())
461 value = dict(map(
462 lambda kv: (
463 key_cast(kv[0]),
464 cls.parse_value(
465 kv[1],
466 value_cast_by_key.get(kv[0], value_cast)
467 )
468 ),
469 [val.split('=') for val in value.split(';') if val]
470 ))
471 elif cast is dict: 471 ↛ 472line 471 didn't jump to line 472, because the condition on line 471 was never true
472 value = dict([val.split('=') for val in value.split(',') if val])
473 elif cast is list:
474 value = [x for x in value.split(',') if x]
475 elif cast is tuple: 475 ↛ 476line 475 didn't jump to line 476, because the condition on line 475 was never true
476 val = value.strip('(').strip(')').split(',')
477 value = tuple([x for x in val if x])
478 elif cast is float: 478 ↛ 480line 478 didn't jump to line 480, because the condition on line 478 was never true
479 # clean string
480 float_str = re.sub(r'[^\d,.-]', '', value)
481 # split for avoid thousand separator and different
482 # locale comma/dot symbol
483 parts = re.split(r'[,.]', float_str)
484 if len(parts) == 1:
485 float_str = parts[0]
486 else:
487 float_str = "{}.{}".format(''.join(parts[0:-1]), parts[-1])
488 value = float(float_str)
489 else:
490 value = cast(value)
491 return value
493 @classmethod
494 def db_url_config(cls, url, engine=None):
495 """Parse an arbitrary database URL.
497 Supports the following URL schemas:
499 * PostgreSQL: ``postgres[ql]?://`` or ``p[g]?sql://``
500 * PostGIS: ``postgis://``
501 * MySQL: ``mysql://`` or ``mysql2://``
502 * MySQL (GIS): ``mysqlgis://``
503 * MySQL Connector Python from Oracle: ``mysql-connector://``
504 * SQLite: ``sqlite://``
505 * SQLite with SpatiaLite for GeoDjango: ``spatialite://``
506 * Oracle: ``oracle://``
507 * Microsoft SQL Server: ``mssql://``
508 * PyODBC: ``pyodbc://``
509 * Amazon Redshift: ``redshift://``
510 * LDAP: ``ldap://``
512 :param urllib.parse.ParseResult or str url:
513 Database URL to parse.
514 :param str or None engine:
515 If None, the database engine is evaluates from the ``url``.
516 :return: Parsed database URL.
517 :rtype: dict
518 """
519 if not isinstance(url, cls.URL_CLASS): 519 ↛ 531line 519 didn't jump to line 531, because the condition on line 519 was never false
520 if url == 'sqlite://:memory:': 520 ↛ 524line 520 didn't jump to line 524, because the condition on line 520 was never true
521 # this is a special case, because if we pass this URL into
522 # urlparse, urlparse will choke trying to interpret "memory"
523 # as a port number
524 return {
525 'ENGINE': cls.DB_SCHEMES['sqlite'],
526 'NAME': ':memory:'
527 }
528 # note: no other settings are required for sqlite
529 url = urlparse(url)
531 config = {}
533 # Remove query strings.
534 path = url.path[1:]
535 path = unquote_plus(path.split('?', 2)[0])
537 if url.scheme == 'sqlite': 537 ↛ 538line 537 didn't jump to line 538, because the condition on line 537 was never true
538 if path == '':
539 # if we are using sqlite and we have no path, then assume we
540 # want an in-memory database (this is the behaviour of
541 # sqlalchemy)
542 path = ':memory:'
543 if url.netloc:
544 warnings.warn('SQLite URL contains host component %r, '
545 'it will be ignored' % url.netloc, stacklevel=3)
546 if url.scheme == 'ldap': 546 ↛ 547line 546 didn't jump to line 547, because the condition on line 546 was never true
547 path = '{scheme}://{hostname}'.format(
548 scheme=url.scheme,
549 hostname=url.hostname,
550 )
551 if url.port:
552 path += ':{port}'.format(port=url.port)
554 user_host = url.netloc.rsplit('@', 1)
555 if url.scheme in cls.POSTGRES_FAMILY and ',' in user_host[-1]: 555 ↛ 557line 555 didn't jump to line 557, because the condition on line 555 was never true
556 # Parsing postgres cluster dsn
557 hinfo = list(
558 itertools.zip_longest(
559 *(
560 host.rsplit(':', 1)
561 for host in user_host[-1].split(',')
562 )
563 )
564 )
565 hostname = ','.join(hinfo[0])
566 port = ','.join(filter(None, hinfo[1])) if len(hinfo) == 2 else ''
567 else:
568 hostname = url.hostname
569 port = url.port
571 # Update with environment configuration.
572 config.update({
573 'NAME': path or '',
574 'USER': _cast_urlstr(url.username) or '',
575 'PASSWORD': _cast_urlstr(url.password) or '',
576 'HOST': hostname or '',
577 'PORT': _cast_int(port) or '',
578 })
580 if ( 580 ↛ 584line 580 didn't jump to line 584
581 url.scheme in cls.POSTGRES_FAMILY and path.startswith('/')
582 or cls.CLOUDSQL in path and path.startswith('/')
583 ):
584 config['HOST'], config['NAME'] = path.rsplit('/', 1)
586 if url.scheme == 'oracle' and path == '': 586 ↛ 587line 586 didn't jump to line 587, because the condition on line 586 was never true
587 config['NAME'] = config['HOST']
588 config['HOST'] = ''
590 if url.scheme == 'oracle': 590 ↛ 592line 590 didn't jump to line 592, because the condition on line 590 was never true
591 # Django oracle/base.py strips port and fails on non-string value
592 if not config['PORT']:
593 del (config['PORT'])
594 else:
595 config['PORT'] = str(config['PORT'])
597 if url.query: 597 ↛ 606line 597 didn't jump to line 606, because the condition on line 597 was never false
598 config_options = {}
599 for k, v in parse_qs(url.query).items():
600 if k.upper() in cls._DB_BASE_OPTIONS: 600 ↛ 601line 600 didn't jump to line 601, because the condition on line 600 was never true
601 config.update({k.upper(): _cast(v[0])})
602 else:
603 config_options.update({k: _cast_int(v[0])})
604 config['OPTIONS'] = config_options
606 if engine: 606 ↛ 607line 606 didn't jump to line 607, because the condition on line 606 was never true
607 config['ENGINE'] = engine
608 else:
609 config['ENGINE'] = url.scheme
611 if config['ENGINE'] in Env.DB_SCHEMES: 611 ↛ 614line 611 didn't jump to line 614, because the condition on line 611 was never false
612 config['ENGINE'] = Env.DB_SCHEMES[config['ENGINE']]
614 if not config.get('ENGINE', False): 614 ↛ 615line 614 didn't jump to line 615, because the condition on line 614 was never true
615 warnings.warn("Engine not recognized from url: {}".format(config))
616 return {}
618 return config
620 @classmethod
621 def cache_url_config(cls, url, backend=None):
622 """Parse an arbitrary cache URL.
624 :param urllib.parse.ParseResult or str url:
625 Cache URL to parse.
626 :param str or None backend:
627 If None, the backend is evaluates from the ``url``.
628 :return: Parsed cache URL.
629 :rtype: dict
630 """
631 if not isinstance(url, cls.URL_CLASS):
632 if not url:
633 return {}
634 else:
635 url = urlparse(url)
637 if url.scheme not in cls.CACHE_SCHEMES:
638 raise ImproperlyConfigured(
639 'Invalid cache schema {}'.format(url.scheme)
640 )
642 location = url.netloc.split(',')
643 if len(location) == 1:
644 location = location[0]
646 config = {
647 'BACKEND': cls.CACHE_SCHEMES[url.scheme],
648 'LOCATION': location,
649 }
651 # Add the drive to LOCATION
652 if url.scheme == 'filecache':
653 config.update({
654 'LOCATION': url.netloc + url.path,
655 })
657 # urlparse('pymemcache://127.0.0.1:11211')
658 # => netloc='127.0.0.1:11211', path=''
659 #
660 # urlparse('pymemcache://memcached:11211/?key_prefix=ci')
661 # => netloc='memcached:11211', path='/'
662 #
663 # urlparse('memcache:///tmp/memcached.sock')
664 # => netloc='', path='/tmp/memcached.sock'
665 if not url.netloc and url.scheme in ['memcache', 'pymemcache']:
666 config.update({
667 'LOCATION': 'unix:' + url.path,
668 })
669 elif url.scheme.startswith('redis'):
670 if url.hostname:
671 scheme = url.scheme.replace('cache', '')
672 else:
673 scheme = 'unix'
674 locations = [scheme + '://' + loc + url.path
675 for loc in url.netloc.split(',')]
676 if len(locations) == 1:
677 config['LOCATION'] = locations[0]
678 else:
679 config['LOCATION'] = locations
681 if url.query:
682 config_options = {}
683 for k, v in parse_qs(url.query).items():
684 opt = {k.upper(): _cast(v[0])}
685 if k.upper() in cls._CACHE_BASE_OPTIONS:
686 config.update(opt)
687 else:
688 config_options.update(opt)
689 config['OPTIONS'] = config_options
691 if backend:
692 config['BACKEND'] = backend
694 return config
696 @classmethod
697 def email_url_config(cls, url, backend=None):
698 """Parse an arbitrary email URL.
700 :param urllib.parse.ParseResult or str url:
701 Email URL to parse.
702 :param str or None backend:
703 If None, the backend is evaluates from the ``url``.
704 :return: Parsed email URL.
705 :rtype: dict
706 """
708 config = {}
710 url = urlparse(url) if not isinstance(url, cls.URL_CLASS) else url
712 # Remove query strings
713 path = url.path[1:]
714 path = unquote_plus(path.split('?', 2)[0])
716 # Update with environment configuration
717 config.update({
718 'EMAIL_FILE_PATH': path,
719 'EMAIL_HOST_USER': _cast_urlstr(url.username),
720 'EMAIL_HOST_PASSWORD': _cast_urlstr(url.password),
721 'EMAIL_HOST': url.hostname,
722 'EMAIL_PORT': _cast_int(url.port),
723 })
725 if backend:
726 config['EMAIL_BACKEND'] = backend
727 elif url.scheme not in cls.EMAIL_SCHEMES:
728 raise ImproperlyConfigured('Invalid email schema %s' % url.scheme)
729 elif url.scheme in cls.EMAIL_SCHEMES:
730 config['EMAIL_BACKEND'] = cls.EMAIL_SCHEMES[url.scheme]
732 if url.scheme in ('smtps', 'smtp+tls'):
733 config['EMAIL_USE_TLS'] = True
734 elif url.scheme == 'smtp+ssl':
735 config['EMAIL_USE_SSL'] = True
737 if url.query:
738 config_options = {}
739 for k, v in parse_qs(url.query).items():
740 opt = {k.upper(): _cast_int(v[0])}
741 if k.upper() in cls._EMAIL_BASE_OPTIONS:
742 config.update(opt)
743 else:
744 config_options.update(opt)
745 config['OPTIONS'] = config_options
747 return config
749 @classmethod
750 def search_url_config(cls, url, engine=None):
751 """Parse an arbitrary search URL.
753 :param urllib.parse.ParseResult or str url:
754 Search URL to parse.
755 :param str or None engine:
756 If None, the engine is evaluates from the ``url``.
757 :return: Parsed search URL.
758 :rtype: dict
759 """
761 config = {}
763 url = urlparse(url) if not isinstance(url, cls.URL_CLASS) else url
765 # Remove query strings.
766 path = url.path[1:]
767 path = unquote_plus(path.split('?', 2)[0])
769 if url.scheme not in cls.SEARCH_SCHEMES:
770 raise ImproperlyConfigured(
771 'Invalid search schema %s' % url.scheme
772 )
773 config["ENGINE"] = cls.SEARCH_SCHEMES[url.scheme]
775 # check commons params
776 params = {} # type: dict
777 if url.query:
778 params = parse_qs(url.query)
779 if 'EXCLUDED_INDEXES' in params.keys():
780 config['EXCLUDED_INDEXES'] \
781 = params['EXCLUDED_INDEXES'][0].split(',')
782 if 'INCLUDE_SPELLING' in params.keys():
783 config['INCLUDE_SPELLING'] = cls.parse_value(
784 params['INCLUDE_SPELLING'][0],
785 bool
786 )
787 if 'BATCH_SIZE' in params.keys():
788 config['BATCH_SIZE'] = cls.parse_value(
789 params['BATCH_SIZE'][0],
790 int
791 )
793 if url.scheme == 'simple':
794 return config
795 elif url.scheme in ['solr'] + cls.ELASTICSEARCH_FAMILY:
796 if 'KWARGS' in params.keys():
797 config['KWARGS'] = params['KWARGS'][0]
799 # remove trailing slash
800 if path.endswith("/"):
801 path = path[:-1]
803 if url.scheme == 'solr':
804 config['URL'] = urlunparse(
805 ('http',) + url[1:2] + (path,) + ('', '', '')
806 )
807 if 'TIMEOUT' in params.keys():
808 config['TIMEOUT'] = cls.parse_value(params['TIMEOUT'][0], int)
809 return config
811 if url.scheme in cls.ELASTICSEARCH_FAMILY:
812 split = path.rsplit("/", 1)
814 if len(split) > 1:
815 path = "/".join(split[:-1])
816 index = split[-1]
817 else:
818 path = ""
819 index = split[0]
821 config['URL'] = urlunparse(
822 ('http',) + url[1:2] + (path,) + ('', '', '')
823 )
824 if 'TIMEOUT' in params.keys():
825 config['TIMEOUT'] = cls.parse_value(params['TIMEOUT'][0], int)
826 config['INDEX_NAME'] = index
827 return config
829 config['PATH'] = '/' + path
831 if url.scheme == 'whoosh':
832 if 'STORAGE' in params.keys():
833 config['STORAGE'] = params['STORAGE'][0]
834 if 'POST_LIMIT' in params.keys():
835 config['POST_LIMIT'] = cls.parse_value(
836 params['POST_LIMIT'][0],
837 int
838 )
839 elif url.scheme == 'xapian':
840 if 'FLAGS' in params.keys():
841 config['FLAGS'] = params['FLAGS'][0]
843 if engine:
844 config['ENGINE'] = engine
846 return config
848 @classmethod
849 def read_env(cls, env_file=None, overwrite=False, **overrides):
850 r"""Read a .env file into os.environ.
852 If not given a path to a dotenv path, does filthy magic stack
853 backtracking to find the dotenv in the same directory as the file that
854 called ``read_env``.
856 Existing environment variables take precedent and are NOT overwritten
857 by the file content. ``overwrite=True`` will force an overwrite of
858 existing environment variables.
860 Refs:
862 * https://wellfire.co/learn/easier-12-factor-django
864 :param env_file: The path to the ``.env`` file your application should
865 use. If a path is not provided, `read_env` will attempt to import
866 the Django settings module from the Django project root.
867 :param overwrite: ``overwrite=True`` will force an overwrite of
868 existing environment variables.
869 :param \**overrides: Any additional keyword arguments provided directly
870 to read_env will be added to the environment. If the key matches an
871 existing environment variable, the value will be overridden.
872 """
873 if env_file is None: 873 ↛ 885line 873 didn't jump to line 885, because the condition on line 873 was never false
874 frame = sys._getframe()
875 env_file = os.path.join(
876 os.path.dirname(frame.f_back.f_code.co_filename),
877 '.env'
878 )
879 if not os.path.exists(env_file): 879 ↛ 880line 879 didn't jump to line 880, because the condition on line 879 was never true
880 logger.info(
881 "%s doesn't exist - if you're not configuring your "
882 "environment separately, create one." % env_file)
883 return
885 try:
886 if isinstance(env_file, Openable): 886 ↛ 891line 886 didn't jump to line 891, because the condition on line 886 was never false
887 # Python 3.5 support (wrap path with str).
888 with open(str(env_file)) as f:
889 content = f.read()
890 else:
891 with env_file as f:
892 content = f.read()
893 except OSError:
894 logger.info(
895 "%s not found - if you're not configuring your "
896 "environment separately, check this." % env_file)
897 return
899 logger.debug('Read environment variables from: {}'.format(env_file))
901 def _keep_escaped_format_characters(match):
902 """Keep escaped newline/tabs in quoted strings"""
903 escaped_char = match.group(1)
904 if escaped_char in 'rnt':
905 return '\\' + escaped_char
906 return escaped_char
908 for line in content.splitlines():
909 m1 = re.match(r'\A(?:export )?([A-Za-z_0-9]+)=(.*)\Z', line)
910 if m1:
911 key, val = m1.group(1), m1.group(2)
912 m2 = re.match(r"\A'(.*)'\Z", val)
913 if m2: 913 ↛ 914line 913 didn't jump to line 914, because the condition on line 913 was never true
914 val = m2.group(1)
915 m3 = re.match(r'\A"(.*)"\Z', val)
916 if m3: 916 ↛ 917line 916 didn't jump to line 917, because the condition on line 916 was never true
917 val = re.sub(r'\\(.)', _keep_escaped_format_characters,
918 m3.group(1))
919 overrides[key] = str(val)
920 elif not line or line.startswith('#'): 920 ↛ 924line 920 didn't jump to line 924, because the condition on line 920 was never false
921 # ignore warnings for empty line-breaks or comments
922 pass
923 else:
924 logger.warning('Invalid line: %s', line)
926 def set_environ(envval):
927 """Return lambda to set environ.
929 Use setdefault unless overwrite is specified.
930 """
931 if overwrite: 931 ↛ 932line 931 didn't jump to line 932, because the condition on line 931 was never true
932 return lambda k, v: envval.update({k: str(v)})
933 return lambda k, v: envval.setdefault(k, str(v))
935 setenv = set_environ(cls.ENVIRON)
937 for key, value in overrides.items():
938 setenv(key, value)
941class FileAwareEnv(Env):
942 """
943 First look for environment variables with ``_FILE`` appended. If found,
944 their contents will be read from the file system and used instead.
946 Use as a drop-in replacement for the standard ``environ.Env``:
948 .. code-block:: python
950 python env = environ.FileAwareEnv()
952 For example, if a ``SECRET_KEY_FILE`` environment variable was set,
953 ``env("SECRET_KEY")`` would find the related variable, returning the file
954 contents rather than ever looking up a ``SECRET_KEY`` environment variable.
955 """
956 ENVIRON = FileAwareMapping()
959class Path:
960 """Inspired to Django Two-scoops, handling File Paths in Settings."""
962 def path(self, *paths, **kwargs):
963 """Create new Path based on self.root and provided paths.
965 :param paths: List of sub paths
966 :param kwargs: required=False
967 :rtype: Path
968 """
969 return self.__class__(self.__root__, *paths, **kwargs)
971 def file(self, name, *args, **kwargs):
972 r"""Open a file.
974 :param str name: Filename appended to :py:attr:`~root`
975 :param \*args: ``*args`` passed to :py:func:`open`
976 :param \**kwargs: ``**kwargs`` passed to :py:func:`open`
977 :rtype: typing.IO[typing.Any]
978 """
979 return open(self(name), *args, **kwargs)
981 @property
982 def root(self):
983 """Current directory for this Path"""
984 return self.__root__
986 def __init__(self, start='', *paths, **kwargs):
988 super().__init__()
990 if kwargs.get('is_file', False):
991 start = os.path.dirname(start)
993 self.__root__ = self._absolute_join(start, *paths, **kwargs)
995 def __call__(self, *paths, **kwargs):
996 """Retrieve the absolute path, with appended paths
998 :param paths: List of sub path of self.root
999 :param kwargs: required=False
1000 """
1001 return self._absolute_join(self.__root__, *paths, **kwargs)
1003 def __eq__(self, other):
1004 if isinstance(other, Path):
1005 return self.__root__ == other.__root__
1006 return self.__root__ == other
1008 def __ne__(self, other):
1009 return not self.__eq__(other)
1011 def __add__(self, other):
1012 if not isinstance(other, Path):
1013 return Path(self.__root__, other)
1014 return Path(self.__root__, other.__root__)
1016 def __sub__(self, other):
1017 if isinstance(other, int):
1018 return self.path('../' * other)
1019 elif isinstance(other, str):
1020 if self.__root__.endswith(other):
1021 return Path(self.__root__.rstrip(other))
1022 raise TypeError(
1023 "unsupported operand type(s) for -: '{self}' and '{other}' "
1024 "unless value of {self} ends with value of {other}".format(
1025 self=type(self), other=type(other)
1026 )
1027 )
1029 def __invert__(self):
1030 return self.path('..')
1032 def __contains__(self, item):
1033 base_path = self.__root__
1034 if len(base_path) > 1:
1035 base_path = os.path.join(base_path, '')
1036 return item.__root__.startswith(base_path)
1038 def __repr__(self):
1039 return "<Path:{}>".format(self.__root__)
1041 def __str__(self):
1042 return self.__root__
1044 def __unicode__(self):
1045 return self.__str__()
1047 def __getitem__(self, *args, **kwargs):
1048 return self.__str__().__getitem__(*args, **kwargs)
1050 def __fspath__(self):
1051 return self.__str__()
1053 def rfind(self, *args, **kwargs):
1054 return self.__str__().rfind(*args, **kwargs)
1056 def find(self, *args, **kwargs):
1057 return self.__str__().find(*args, **kwargs)
1059 @staticmethod
1060 def _absolute_join(base, *paths, **kwargs):
1061 absolute_path = os.path.abspath(os.path.join(base, *paths))
1062 if kwargs.get('required', False) and not os.path.exists(absolute_path):
1063 raise ImproperlyConfigured(
1064 "Create required path: {}".format(absolute_path))
1065 return absolute_path
1068def register_scheme(scheme):
1069 for method in dir(urlparselib):
1070 if method.startswith('uses_'):
1071 getattr(urlparselib, method).append(scheme)
1074def register_schemes(schemes):
1075 for scheme in schemes:
1076 register_scheme(scheme)
1079# Register database and cache schemes in URLs.
1080register_schemes(Env.DB_SCHEMES.keys())
1081register_schemes(Env.CACHE_SCHEMES.keys())
1082register_schemes(Env.SEARCH_SCHEMES.keys())
1083register_schemes(Env.EMAIL_SCHEMES.keys())