Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/contrib/messages/storage/cookie.py: 28%

88 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import binascii 

2import json 

3 

4from django.conf import settings 

5from django.contrib.messages.storage.base import BaseStorage, Message 

6from django.core import signing 

7from django.http import SimpleCookie 

8from django.utils.safestring import SafeData, mark_safe 

9 

10 

11class MessageEncoder(json.JSONEncoder): 

12 """ 

13 Compactly serialize instances of the ``Message`` class as JSON. 

14 """ 

15 

16 message_key = "__json_message" 

17 

18 def default(self, obj): 

19 if isinstance(obj, Message): 

20 # Using 0/1 here instead of False/True to produce more compact json 

21 is_safedata = 1 if isinstance(obj.message, SafeData) else 0 

22 message = [self.message_key, is_safedata, obj.level, obj.message] 

23 if obj.extra_tags: 

24 message.append(obj.extra_tags) 

25 return message 

26 return super().default(obj) 

27 

28 

29class MessageDecoder(json.JSONDecoder): 

30 """ 

31 Decode JSON that includes serialized ``Message`` instances. 

32 """ 

33 

34 def process_messages(self, obj): 

35 if isinstance(obj, list) and obj: 

36 if obj[0] == MessageEncoder.message_key: 

37 if obj[1]: 

38 obj[3] = mark_safe(obj[3]) 

39 return Message(*obj[2:]) 

40 return [self.process_messages(item) for item in obj] 

41 if isinstance(obj, dict): 

42 return {key: self.process_messages(value) for key, value in obj.items()} 

43 return obj 

44 

45 def decode(self, s, **kwargs): 

46 decoded = super().decode(s, **kwargs) 

47 return self.process_messages(decoded) 

48 

49 

50class MessageSerializer: 

51 def dumps(self, obj): 

52 return json.dumps( 

53 obj, 

54 separators=(",", ":"), 

55 cls=MessageEncoder, 

56 ).encode("latin-1") 

57 

58 def loads(self, data): 

59 return json.loads(data.decode("latin-1"), cls=MessageDecoder) 

60 

61 

62class CookieStorage(BaseStorage): 

63 """ 

64 Store messages in a cookie. 

65 """ 

66 

67 cookie_name = "messages" 

68 # uwsgi's default configuration enforces a maximum size of 4kb for all the 

69 # HTTP headers. In order to leave some room for other cookies and headers, 

70 # restrict the session cookie to 1/2 of 4kb. See #18781. 

71 max_cookie_size = 2048 

72 not_finished = "__messagesnotfinished__" 

73 key_salt = "django.contrib.messages" 

74 

75 def __init__(self, *args, **kwargs): 

76 super().__init__(*args, **kwargs) 

77 self.signer = signing.get_cookie_signer(salt=self.key_salt) 

78 

79 def _get(self, *args, **kwargs): 

80 """ 

81 Retrieve a list of messages from the messages cookie. If the 

82 not_finished sentinel value is found at the end of the message list, 

83 remove it and return a result indicating that not all messages were 

84 retrieved by this storage. 

85 """ 

86 data = self.request.COOKIES.get(self.cookie_name) 

87 messages = self._decode(data) 

88 all_retrieved = not (messages and messages[-1] == self.not_finished) 

89 if messages and not all_retrieved: 

90 # remove the sentinel value 

91 messages.pop() 

92 return messages, all_retrieved 

93 

94 def _update_cookie(self, encoded_data, response): 

95 """ 

96 Either set the cookie with the encoded data if there is any data to 

97 store, or delete the cookie. 

98 """ 

99 if encoded_data: 

100 response.set_cookie( 

101 self.cookie_name, 

102 encoded_data, 

103 domain=settings.SESSION_COOKIE_DOMAIN, 

104 secure=settings.SESSION_COOKIE_SECURE or None, 

105 httponly=settings.SESSION_COOKIE_HTTPONLY or None, 

106 samesite=settings.SESSION_COOKIE_SAMESITE, 

107 ) 

108 else: 

109 response.delete_cookie( 

110 self.cookie_name, 

111 domain=settings.SESSION_COOKIE_DOMAIN, 

112 samesite=settings.SESSION_COOKIE_SAMESITE, 

113 ) 

114 

115 def _store(self, messages, response, remove_oldest=True, *args, **kwargs): 

116 """ 

117 Store the messages to a cookie and return a list of any messages which 

118 could not be stored. 

119 

120 If the encoded data is larger than ``max_cookie_size``, remove 

121 messages until the data fits (these are the messages which are 

122 returned), and add the not_finished sentinel value to indicate as much. 

123 """ 

124 unstored_messages = [] 

125 encoded_data = self._encode(messages) 

126 if self.max_cookie_size: 

127 # data is going to be stored eventually by SimpleCookie, which 

128 # adds its own overhead, which we must account for. 

129 cookie = SimpleCookie() # create outside the loop 

130 

131 def stored_length(val): 

132 return len(cookie.value_encode(val)[1]) 

133 

134 while encoded_data and stored_length(encoded_data) > self.max_cookie_size: 

135 if remove_oldest: 

136 unstored_messages.append(messages.pop(0)) 

137 else: 

138 unstored_messages.insert(0, messages.pop()) 

139 encoded_data = self._encode( 

140 messages + [self.not_finished], encode_empty=unstored_messages 

141 ) 

142 self._update_cookie(encoded_data, response) 

143 return unstored_messages 

144 

145 def _encode(self, messages, encode_empty=False): 

146 """ 

147 Return an encoded version of the messages list which can be stored as 

148 plain text. 

149 

150 Since the data will be retrieved from the client-side, the encoded data 

151 also contains a hash to ensure that the data was not tampered with. 

152 """ 

153 if messages or encode_empty: 

154 return self.signer.sign_object( 

155 messages, serializer=MessageSerializer, compress=True 

156 ) 

157 

158 def _decode(self, data): 

159 """ 

160 Safely decode an encoded text stream back into a list of messages. 

161 

162 If the encoded text stream contained an invalid hash or was in an 

163 invalid format, return None. 

164 """ 

165 if not data: 

166 return None 

167 try: 

168 return self.signer.unsign_object(data, serializer=MessageSerializer) 

169 # RemovedInDjango41Warning: when the deprecation ends, replace with: 

170 # 

171 # except (signing.BadSignature, json.JSONDecodeError): 

172 # pass 

173 except signing.BadSignature: 

174 decoded = None 

175 except (binascii.Error, json.JSONDecodeError): 

176 decoded = self.signer.unsign(data) 

177 

178 if decoded: 

179 # RemovedInDjango41Warning. 

180 try: 

181 return json.loads(decoded, cls=MessageDecoder) 

182 except json.JSONDecodeError: 

183 pass 

184 # Mark the data as used (so it gets removed) since something was wrong 

185 # with the data. 

186 self.used = True 

187 return None