Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/utils/connection.py: 85%
53 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from asgiref.local import Local
3from django.conf import settings as django_settings
4from django.utils.functional import cached_property
7class ConnectionProxy:
8 """Proxy for accessing a connection object's attributes."""
10 def __init__(self, connections, alias):
11 self.__dict__["_connections"] = connections
12 self.__dict__["_alias"] = alias
14 def __getattr__(self, item):
15 return getattr(self._connections[self._alias], item)
17 def __setattr__(self, name, value):
18 return setattr(self._connections[self._alias], name, value)
20 def __delattr__(self, name):
21 return delattr(self._connections[self._alias], name)
23 def __contains__(self, key):
24 return key in self._connections[self._alias]
26 def __eq__(self, other):
27 return self._connections[self._alias] == other
30class ConnectionDoesNotExist(Exception):
31 pass
34class BaseConnectionHandler:
35 settings_name = None
36 exception_class = ConnectionDoesNotExist
37 thread_critical = False
39 def __init__(self, settings=None):
40 self._settings = settings
41 self._connections = Local(self.thread_critical)
43 @cached_property
44 def settings(self):
45 self._settings = self.configure_settings(self._settings)
46 return self._settings
48 def configure_settings(self, settings):
49 if settings is None: 49 ↛ 51line 49 didn't jump to line 51, because the condition on line 49 was never false
50 settings = getattr(django_settings, self.settings_name)
51 return settings
53 def create_connection(self, alias):
54 raise NotImplementedError("Subclasses must implement create_connection().")
56 def __getitem__(self, alias):
57 try:
58 return getattr(self._connections, alias)
59 except AttributeError:
60 if alias not in self.settings: 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true
61 raise self.exception_class(f"The connection '{alias}' doesn't exist.")
62 conn = self.create_connection(alias)
63 setattr(self._connections, alias, conn)
64 return conn
66 def __setitem__(self, key, value):
67 setattr(self._connections, key, value)
69 def __delitem__(self, key):
70 delattr(self._connections, key)
72 def __iter__(self):
73 return iter(self.settings)
75 def all(self):
76 return [self[alias] for alias in self]