Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/contrib/messages/storage/cookie.py: 28%
88 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import binascii
2import json
4from django.conf import settings
5from django.contrib.messages.storage.base import BaseStorage, Message
6from django.core import signing
7from django.http import SimpleCookie
8from django.utils.safestring import SafeData, mark_safe
11class MessageEncoder(json.JSONEncoder):
12 """
13 Compactly serialize instances of the ``Message`` class as JSON.
14 """
16 message_key = "__json_message"
18 def default(self, obj):
19 if isinstance(obj, Message):
20 # Using 0/1 here instead of False/True to produce more compact json
21 is_safedata = 1 if isinstance(obj.message, SafeData) else 0
22 message = [self.message_key, is_safedata, obj.level, obj.message]
23 if obj.extra_tags:
24 message.append(obj.extra_tags)
25 return message
26 return super().default(obj)
29class MessageDecoder(json.JSONDecoder):
30 """
31 Decode JSON that includes serialized ``Message`` instances.
32 """
34 def process_messages(self, obj):
35 if isinstance(obj, list) and obj:
36 if obj[0] == MessageEncoder.message_key:
37 if obj[1]:
38 obj[3] = mark_safe(obj[3])
39 return Message(*obj[2:])
40 return [self.process_messages(item) for item in obj]
41 if isinstance(obj, dict):
42 return {key: self.process_messages(value) for key, value in obj.items()}
43 return obj
45 def decode(self, s, **kwargs):
46 decoded = super().decode(s, **kwargs)
47 return self.process_messages(decoded)
50class MessageSerializer:
51 def dumps(self, obj):
52 return json.dumps(
53 obj,
54 separators=(",", ":"),
55 cls=MessageEncoder,
56 ).encode("latin-1")
58 def loads(self, data):
59 return json.loads(data.decode("latin-1"), cls=MessageDecoder)
62class CookieStorage(BaseStorage):
63 """
64 Store messages in a cookie.
65 """
67 cookie_name = "messages"
68 # uwsgi's default configuration enforces a maximum size of 4kb for all the
69 # HTTP headers. In order to leave some room for other cookies and headers,
70 # restrict the session cookie to 1/2 of 4kb. See #18781.
71 max_cookie_size = 2048
72 not_finished = "__messagesnotfinished__"
73 key_salt = "django.contrib.messages"
75 def __init__(self, *args, **kwargs):
76 super().__init__(*args, **kwargs)
77 self.signer = signing.get_cookie_signer(salt=self.key_salt)
79 def _get(self, *args, **kwargs):
80 """
81 Retrieve a list of messages from the messages cookie. If the
82 not_finished sentinel value is found at the end of the message list,
83 remove it and return a result indicating that not all messages were
84 retrieved by this storage.
85 """
86 data = self.request.COOKIES.get(self.cookie_name)
87 messages = self._decode(data)
88 all_retrieved = not (messages and messages[-1] == self.not_finished)
89 if messages and not all_retrieved:
90 # remove the sentinel value
91 messages.pop()
92 return messages, all_retrieved
94 def _update_cookie(self, encoded_data, response):
95 """
96 Either set the cookie with the encoded data if there is any data to
97 store, or delete the cookie.
98 """
99 if encoded_data:
100 response.set_cookie(
101 self.cookie_name,
102 encoded_data,
103 domain=settings.SESSION_COOKIE_DOMAIN,
104 secure=settings.SESSION_COOKIE_SECURE or None,
105 httponly=settings.SESSION_COOKIE_HTTPONLY or None,
106 samesite=settings.SESSION_COOKIE_SAMESITE,
107 )
108 else:
109 response.delete_cookie(
110 self.cookie_name,
111 domain=settings.SESSION_COOKIE_DOMAIN,
112 samesite=settings.SESSION_COOKIE_SAMESITE,
113 )
115 def _store(self, messages, response, remove_oldest=True, *args, **kwargs):
116 """
117 Store the messages to a cookie and return a list of any messages which
118 could not be stored.
120 If the encoded data is larger than ``max_cookie_size``, remove
121 messages until the data fits (these are the messages which are
122 returned), and add the not_finished sentinel value to indicate as much.
123 """
124 unstored_messages = []
125 encoded_data = self._encode(messages)
126 if self.max_cookie_size:
127 # data is going to be stored eventually by SimpleCookie, which
128 # adds its own overhead, which we must account for.
129 cookie = SimpleCookie() # create outside the loop
131 def stored_length(val):
132 return len(cookie.value_encode(val)[1])
134 while encoded_data and stored_length(encoded_data) > self.max_cookie_size:
135 if remove_oldest:
136 unstored_messages.append(messages.pop(0))
137 else:
138 unstored_messages.insert(0, messages.pop())
139 encoded_data = self._encode(
140 messages + [self.not_finished], encode_empty=unstored_messages
141 )
142 self._update_cookie(encoded_data, response)
143 return unstored_messages
145 def _encode(self, messages, encode_empty=False):
146 """
147 Return an encoded version of the messages list which can be stored as
148 plain text.
150 Since the data will be retrieved from the client-side, the encoded data
151 also contains a hash to ensure that the data was not tampered with.
152 """
153 if messages or encode_empty:
154 return self.signer.sign_object(
155 messages, serializer=MessageSerializer, compress=True
156 )
158 def _decode(self, data):
159 """
160 Safely decode an encoded text stream back into a list of messages.
162 If the encoded text stream contained an invalid hash or was in an
163 invalid format, return None.
164 """
165 if not data:
166 return None
167 try:
168 return self.signer.unsign_object(data, serializer=MessageSerializer)
169 # RemovedInDjango41Warning: when the deprecation ends, replace with:
170 #
171 # except (signing.BadSignature, json.JSONDecodeError):
172 # pass
173 except signing.BadSignature:
174 decoded = None
175 except (binascii.Error, json.JSONDecodeError):
176 decoded = self.signer.unsign(data)
178 if decoded:
179 # RemovedInDjango41Warning.
180 try:
181 return json.loads(decoded, cls=MessageDecoder)
182 except json.JSONDecodeError:
183 pass
184 # Mark the data as used (so it gets removed) since something was wrong
185 # with the data.
186 self.used = True
187 return None