Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework/validators.py: 43%

143 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2We perform uniqueness checks explicitly on the serializer class, rather 

3the using Django's `.full_clean()`. 

4 

5This gives us better separation of concerns, allows us to use single-step 

6object creation, and makes it possible to switch between using the implicit 

7`ModelSerializer` class and an equivalent explicit `Serializer` class. 

8""" 

9from django.db import DataError 

10from django.utils.translation import gettext_lazy as _ 

11 

12from rest_framework.exceptions import ValidationError 

13from rest_framework.utils.representation import smart_repr 

14 

15 

16# Robust filter and exist implementations. Ensures that queryset.exists() for 

17# an invalid value returns `False`, rather than raising an error. 

18# Refs https://github.com/encode/django-rest-framework/issues/3381 

19def qs_exists(queryset): 

20 try: 

21 return queryset.exists() 

22 except (TypeError, ValueError, DataError): 

23 return False 

24 

25 

26def qs_filter(queryset, **kwargs): 

27 try: 

28 return queryset.filter(**kwargs) 

29 except (TypeError, ValueError, DataError): 

30 return queryset.none() 

31 

32 

33class UniqueValidator: 

34 """ 

35 Validator that corresponds to `unique=True` on a model field. 

36 

37 Should be applied to an individual field on the serializer. 

38 """ 

39 message = _('This field must be unique.') 

40 requires_context = True 

41 

42 def __init__(self, queryset, message=None, lookup='exact'): 

43 self.queryset = queryset 

44 self.message = message or self.message 

45 self.lookup = lookup 

46 

47 def filter_queryset(self, value, queryset, field_name): 

48 """ 

49 Filter the queryset to all instances matching the given attribute. 

50 """ 

51 filter_kwargs = {'%s__%s' % (field_name, self.lookup): value} 

52 return qs_filter(queryset, **filter_kwargs) 

53 

54 def exclude_current_instance(self, queryset, instance): 

55 """ 

56 If an instance is being updated, then do not include 

57 that instance itself as a uniqueness conflict. 

58 """ 

59 if instance is not None: 59 ↛ 60line 59 didn't jump to line 60, because the condition on line 59 was never true

60 return queryset.exclude(pk=instance.pk) 

61 return queryset 

62 

63 def __call__(self, value, serializer_field): 

64 # Determine the underlying model field name. This may not be the 

65 # same as the serializer field name if `source=<>` is set. 

66 field_name = serializer_field.source_attrs[-1] 

67 # Determine the existing instance, if this is an update operation. 

68 instance = getattr(serializer_field.parent, 'instance', None) 

69 

70 queryset = self.queryset 

71 queryset = self.filter_queryset(value, queryset, field_name) 

72 queryset = self.exclude_current_instance(queryset, instance) 

73 if qs_exists(queryset): 73 ↛ 74line 73 didn't jump to line 74, because the condition on line 73 was never true

74 raise ValidationError(self.message, code='unique') 

75 

76 def __repr__(self): 

77 return '<%s(queryset=%s)>' % ( 

78 self.__class__.__name__, 

79 smart_repr(self.queryset) 

80 ) 

81 

82 

83class UniqueTogetherValidator: 

84 """ 

85 Validator that corresponds to `unique_together = (...)` on a model class. 

86 

87 Should be applied to the serializer class, not to an individual field. 

88 """ 

89 message = _('The fields {field_names} must make a unique set.') 

90 missing_message = _('This field is required.') 

91 requires_context = True 

92 

93 def __init__(self, queryset, fields, message=None): 

94 self.queryset = queryset 

95 self.fields = fields 

96 self.message = message or self.message 

97 

98 def enforce_required_fields(self, attrs, serializer): 

99 """ 

100 The `UniqueTogetherValidator` always forces an implied 'required' 

101 state on the fields it applies to. 

102 """ 

103 if serializer.instance is not None: 

104 return 

105 

106 missing_items = { 

107 field_name: self.missing_message 

108 for field_name in self.fields 

109 if serializer.fields[field_name].source not in attrs 

110 } 

111 if missing_items: 

112 raise ValidationError(missing_items, code='required') 

113 

114 def filter_queryset(self, attrs, queryset, serializer): 

115 """ 

116 Filter the queryset to all instances matching the given attributes. 

117 """ 

118 # field names => field sources 

119 sources = [ 

120 serializer.fields[field_name].source 

121 for field_name in self.fields 

122 ] 

123 

124 # If this is an update, then any unprovided field should 

125 # have it's value set based on the existing instance attribute. 

126 if serializer.instance is not None: 

127 for source in sources: 

128 if source not in attrs: 

129 attrs[source] = getattr(serializer.instance, source) 

130 

131 # Determine the filter keyword arguments and filter the queryset. 

132 filter_kwargs = { 

133 source: attrs[source] 

134 for source in sources 

135 } 

136 return qs_filter(queryset, **filter_kwargs) 

137 

138 def exclude_current_instance(self, attrs, queryset, instance): 

139 """ 

140 If an instance is being updated, then do not include 

141 that instance itself as a uniqueness conflict. 

142 """ 

143 if instance is not None: 

144 return queryset.exclude(pk=instance.pk) 

145 return queryset 

146 

147 def __call__(self, attrs, serializer): 

148 self.enforce_required_fields(attrs, serializer) 

149 queryset = self.queryset 

150 queryset = self.filter_queryset(attrs, queryset, serializer) 

151 queryset = self.exclude_current_instance(attrs, queryset, serializer.instance) 

152 

153 # Ignore validation if any field is None 

154 checked_values = [ 

155 value for field, value in attrs.items() if field in self.fields 

156 ] 

157 if None not in checked_values and qs_exists(queryset): 

158 field_names = ', '.join(self.fields) 

159 message = self.message.format(field_names=field_names) 

160 raise ValidationError(message, code='unique') 

161 

162 def __repr__(self): 

163 return '<%s(queryset=%s, fields=%s)>' % ( 

164 self.__class__.__name__, 

165 smart_repr(self.queryset), 

166 smart_repr(self.fields) 

167 ) 

168 

169 

170class ProhibitSurrogateCharactersValidator: 

171 message = _('Surrogate characters are not allowed: U+{code_point:X}.') 

172 code = 'surrogate_characters_not_allowed' 

173 

174 def __call__(self, value): 

175 for surrogate_character in (ch for ch in str(value) 175 ↛ 177line 175 didn't jump to line 177, because the loop on line 175 never started

176 if 0xD800 <= ord(ch) <= 0xDFFF): 

177 message = self.message.format(code_point=ord(surrogate_character)) 

178 raise ValidationError(message, code=self.code) 

179 

180 

181class BaseUniqueForValidator: 

182 message = None 

183 missing_message = _('This field is required.') 

184 requires_context = True 

185 

186 def __init__(self, queryset, field, date_field, message=None): 

187 self.queryset = queryset 

188 self.field = field 

189 self.date_field = date_field 

190 self.message = message or self.message 

191 

192 def enforce_required_fields(self, attrs): 

193 """ 

194 The `UniqueFor<Range>Validator` classes always force an implied 

195 'required' state on the fields they are applied to. 

196 """ 

197 missing_items = { 

198 field_name: self.missing_message 

199 for field_name in [self.field, self.date_field] 

200 if field_name not in attrs 

201 } 

202 if missing_items: 

203 raise ValidationError(missing_items, code='required') 

204 

205 def filter_queryset(self, attrs, queryset, field_name, date_field_name): 

206 raise NotImplementedError('`filter_queryset` must be implemented.') 

207 

208 def exclude_current_instance(self, attrs, queryset, instance): 

209 """ 

210 If an instance is being updated, then do not include 

211 that instance itself as a uniqueness conflict. 

212 """ 

213 if instance is not None: 

214 return queryset.exclude(pk=instance.pk) 

215 return queryset 

216 

217 def __call__(self, attrs, serializer): 

218 # Determine the underlying model field names. These may not be the 

219 # same as the serializer field names if `source=<>` is set. 

220 field_name = serializer.fields[self.field].source_attrs[-1] 

221 date_field_name = serializer.fields[self.date_field].source_attrs[-1] 

222 

223 self.enforce_required_fields(attrs) 

224 queryset = self.queryset 

225 queryset = self.filter_queryset(attrs, queryset, field_name, date_field_name) 

226 queryset = self.exclude_current_instance(attrs, queryset, serializer.instance) 

227 if qs_exists(queryset): 

228 message = self.message.format(date_field=self.date_field) 

229 raise ValidationError({ 

230 self.field: message 

231 }, code='unique') 

232 

233 def __repr__(self): 

234 return '<%s(queryset=%s, field=%s, date_field=%s)>' % ( 

235 self.__class__.__name__, 

236 smart_repr(self.queryset), 

237 smart_repr(self.field), 

238 smart_repr(self.date_field) 

239 ) 

240 

241 

242class UniqueForDateValidator(BaseUniqueForValidator): 

243 message = _('This field must be unique for the "{date_field}" date.') 

244 

245 def filter_queryset(self, attrs, queryset, field_name, date_field_name): 

246 value = attrs[self.field] 

247 date = attrs[self.date_field] 

248 

249 filter_kwargs = {} 

250 filter_kwargs[field_name] = value 

251 filter_kwargs['%s__day' % date_field_name] = date.day 

252 filter_kwargs['%s__month' % date_field_name] = date.month 

253 filter_kwargs['%s__year' % date_field_name] = date.year 

254 return qs_filter(queryset, **filter_kwargs) 

255 

256 

257class UniqueForMonthValidator(BaseUniqueForValidator): 

258 message = _('This field must be unique for the "{date_field}" month.') 

259 

260 def filter_queryset(self, attrs, queryset, field_name, date_field_name): 

261 value = attrs[self.field] 

262 date = attrs[self.date_field] 

263 

264 filter_kwargs = {} 

265 filter_kwargs[field_name] = value 

266 filter_kwargs['%s__month' % date_field_name] = date.month 

267 return qs_filter(queryset, **filter_kwargs) 

268 

269 

270class UniqueForYearValidator(BaseUniqueForValidator): 

271 message = _('This field must be unique for the "{date_field}" year.') 

272 

273 def filter_queryset(self, attrs, queryset, field_name, date_field_name): 

274 value = attrs[self.field] 

275 date = attrs[self.date_field] 

276 

277 filter_kwargs = {} 

278 filter_kwargs[field_name] = value 

279 filter_kwargs['%s__year' % date_field_name] = date.year 

280 return qs_filter(queryset, **filter_kwargs)