Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/utils/connection.py: 85%

53 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from asgiref.local import Local 

2 

3from django.conf import settings as django_settings 

4from django.utils.functional import cached_property 

5 

6 

7class ConnectionProxy: 

8 """Proxy for accessing a connection object's attributes.""" 

9 

10 def __init__(self, connections, alias): 

11 self.__dict__["_connections"] = connections 

12 self.__dict__["_alias"] = alias 

13 

14 def __getattr__(self, item): 

15 return getattr(self._connections[self._alias], item) 

16 

17 def __setattr__(self, name, value): 

18 return setattr(self._connections[self._alias], name, value) 

19 

20 def __delattr__(self, name): 

21 return delattr(self._connections[self._alias], name) 

22 

23 def __contains__(self, key): 

24 return key in self._connections[self._alias] 

25 

26 def __eq__(self, other): 

27 return self._connections[self._alias] == other 

28 

29 

30class ConnectionDoesNotExist(Exception): 

31 pass 

32 

33 

34class BaseConnectionHandler: 

35 settings_name = None 

36 exception_class = ConnectionDoesNotExist 

37 thread_critical = False 

38 

39 def __init__(self, settings=None): 

40 self._settings = settings 

41 self._connections = Local(self.thread_critical) 

42 

43 @cached_property 

44 def settings(self): 

45 self._settings = self.configure_settings(self._settings) 

46 return self._settings 

47 

48 def configure_settings(self, settings): 

49 if settings is None: 49 ↛ 51line 49 didn't jump to line 51, because the condition on line 49 was never false

50 settings = getattr(django_settings, self.settings_name) 

51 return settings 

52 

53 def create_connection(self, alias): 

54 raise NotImplementedError("Subclasses must implement create_connection().") 

55 

56 def __getitem__(self, alias): 

57 try: 

58 return getattr(self._connections, alias) 

59 except AttributeError: 

60 if alias not in self.settings: 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true

61 raise self.exception_class(f"The connection '{alias}' doesn't exist.") 

62 conn = self.create_connection(alias) 

63 setattr(self._connections, alias, conn) 

64 return conn 

65 

66 def __setitem__(self, key, value): 

67 setattr(self._connections, key, value) 

68 

69 def __delitem__(self, key): 

70 delattr(self._connections, key) 

71 

72 def __iter__(self): 

73 return iter(self.settings) 

74 

75 def all(self): 

76 return [self[alias] for alias in self]