Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreschema/schemas.py: 20%
314 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from collections import namedtuple
2from coreschema.compat import text_types, numeric_types
3from coreschema.formats import validate_format
4from coreschema.utils import uniq
5import re
8Error = namedtuple('Error', ['text', 'index'])
11def push_index(errors, key):
12 return [
13 Error(error.text, [key] + error.index)
14 for error in errors
15 ]
18# TODO: Properties as OrderedDict if from list of tuples.
19# TODO: null keyword / Nullable
20# TODO: dependancies
21# TODO: remote ref
22# TODO: remaining formats
23# LATER: Enum display values
24# LATER: File
25# LATER: strict, coerce float etc...
26# LATER: decimals
27# LATER: override errors
30class Schema(object):
31 errors = {}
33 def __init__(self, title='', description='', default=None):
34 self.title = title
35 self.description = description
36 self.default = default
38 def make_error(self, code):
39 error_string = self.errors[code]
40 params = self.__dict__
41 return Error(error_string.format(**params), [])
43 def __or__(self, other):
44 if isinstance(self, Union):
45 self_children = self.children
46 else:
47 self_children = [self]
49 if isinstance(other, Union):
50 other_children = other.children
51 else:
52 other_children = [other]
54 return Union(self_children + other_children)
56 def __and__(self, other):
57 if isinstance(self, Intersection):
58 self_children = self.children
59 else:
60 self_children = [self]
62 if isinstance(other, Intersection):
63 other_children = other.children
64 else:
65 other_children = [other]
67 return Intersection(self_children + other_children)
69 def __xor__(self, other):
70 return ExclusiveUnion([self, other])
72 def __invert__(self):
73 return Not(self)
75 def __eq__(self, other):
76 return (
77 self.__class__ == other.__class__ and
78 self.__dict__ == other.__dict__
79 )
82class Object(Schema):
83 errors = {
84 'type': 'Must be an object.',
85 'invalid_key': 'Object keys must be strings.',
86 'empty': 'Must not be empty.',
87 'required': 'This field is required.',
88 'max_properties': 'Must have no more than {max_properties} properties.',
89 'min_properties': 'Must have at least {min_properties} properties.',
90 'invalid_property': 'Invalid property.'
91 }
93 def __init__(self, properties=None, required=None, max_properties=None, min_properties=None, pattern_properties=None, additional_properties=True, **kwargs):
94 super(Object, self).__init__(**kwargs)
96 if isinstance(additional_properties, bool):
97 # Handle `additional_properties` set to a boolean.
98 self.additional_properties_schema = Anything()
99 else:
100 # Handle `additional_properties` set to a schema.
101 self.additional_properties_schema = additional_properties
102 additional_properties = True
104 self.properties = properties
105 self.required = required or []
106 self.max_properties = max_properties
107 self.min_properties = min_properties
108 self.pattern_properties = pattern_properties
109 self.additional_properties = additional_properties
111 # Compile pattern regexes.
112 self.pattern_properties_regex = None
113 if pattern_properties is not None:
114 self.pattern_properties_regex = {
115 re.compile(key): value
116 for key, value
117 in pattern_properties.items()
118 }
120 def validate(self, value, context=None):
121 if not isinstance(value, dict):
122 return [self.make_error('type')]
124 errors = []
125 if any(not isinstance(key, text_types) for key in value.keys()):
126 errors += [self.make_error('invalid_key')]
127 if self.required is not None:
128 for key in self.required:
129 if key not in value:
130 error_items = [self.make_error('required')]
131 errors += push_index(error_items, key)
132 if self.min_properties is not None:
133 if len(value) < self.min_properties:
134 if self.min_properties == 1:
135 errors += [self.make_error('empty')]
136 else:
137 errors += [self.make_error('min_properties')]
138 if self.max_properties is not None:
139 if len(value) > self.max_properties:
140 errors += [self.make_error('max_properties')]
142 # Properties
143 remaining_keys = set(value.keys())
144 if self.properties is not None:
145 remaining_keys -= set(self.properties.keys())
146 for key, property_item in self.properties.items():
147 if key not in value:
148 continue
149 error_items = property_item.validate(value[key], context)
150 errors += push_index(error_items, key)
152 # Pattern properties
153 if self.pattern_properties is not None:
154 for key in list(remaining_keys):
155 for pattern, schema in self.pattern_properties_regex.items():
156 if re.search(pattern, key):
157 error_items = schema.validate(value[key], context)
158 errors += push_index(error_items, key)
159 remaining_keys.discard(key)
161 # Additional properties
162 if self.additional_properties:
163 for key in remaining_keys:
164 error_items = self.additional_properties_schema.validate(value[key], context)
165 errors += push_index(error_items, key)
166 else:
167 for key in remaining_keys:
168 error_items = [self.make_error('invalid_property')]
169 errors += push_index(error_items, key)
171 return errors
174class Array(Schema):
175 errors = {
176 'type': 'Must be an array.',
177 'empty': 'Must not be empty.',
178 'max_items': 'Must have no more than {max_items} items.',
179 'min_items': 'Must have at least {min_items} items.',
180 'unique': 'Must not contain duplicate items.'
181 }
183 def __init__(self, items=None, max_items=None, min_items=None, unique_items=False, additional_items=True, **kwargs):
184 super(Array, self).__init__(**kwargs)
186 if items is None:
187 items = Anything()
189 if isinstance(items, list) and additional_items is False:
190 # Setting additional_items==False implies a value for max_items.
191 if max_items is None or max_items > len(items):
192 max_items = len(items)
194 self.items = items
195 self.max_items = max_items
196 self.min_items = min_items
197 self.unique_items = unique_items
198 self.additional_items = additional_items
200 def validate(self, value, context=None):
201 if not isinstance(value, list):
202 return [self.make_error('type')]
204 errors = []
205 if self.items is not None:
206 child_schema = self.items
207 is_list = isinstance(self.items, list)
208 for idx, item in enumerate(value):
209 if is_list:
210 # Case where `items` is a list of schemas.
211 if idx < len(self.items):
212 # Handle each item in the list.
213 child_schema = self.items[idx]
214 else:
215 # Handle any additional items.
216 if isinstance(self.additional_items, bool):
217 break
218 else:
219 child_schema = self.additional_items
220 error_items = child_schema.validate(item, context)
221 errors += push_index(error_items, idx)
222 if self.min_items is not None:
223 if len(value) < self.min_items:
224 if self.min_items == 1:
225 errors += [self.make_error('empty')]
226 else:
227 errors += [self.make_error('min_items')]
228 if self.max_items is not None:
229 if len(value) > self.max_items:
230 errors += [self.make_error('max_items')]
231 if self.unique_items:
232 if not(uniq(value)):
233 errors += [self.make_error('unique')]
235 return errors
238class Number(Schema):
239 integer_only = False
240 errors = {
241 'type': 'Must be a number.',
242 'minimum': 'Must be greater than or equal to {minimum}.',
243 'exclusive_minimum': 'Must be greater than {minimum}.',
244 'maximum': 'Must be less than or equal to {maximum}.',
245 'exclusive_maximum': 'Must be less than {maximum}.',
246 'multiple_of': 'Must be a multiple of {multiple_of}.',
247 }
249 def __init__(self, minimum=None, maximum=None, exclusive_minimum=False, exclusive_maximum=False, multiple_of=None, **kwargs):
250 super(Number, self).__init__(**kwargs)
251 self.minimum = minimum
252 self.maximum = maximum
253 self.exclusive_minimum = exclusive_minimum
254 self.exclusive_maximum = exclusive_maximum
255 self.multiple_of = multiple_of
257 def validate(self, value, context=None):
258 if isinstance(value, bool):
259 # In Python `bool` subclasses `int`, so handle that case explicitly.
260 return [self.make_error('type')]
261 if not isinstance(value, numeric_types):
262 return [self.make_error('type')]
263 if self.integer_only and isinstance(value, float) and not value.is_integer():
264 return [self.make_error('type')]
266 errors = []
267 if self.minimum is not None:
268 if self.exclusive_minimum:
269 if value <= self.minimum:
270 errors += [self.make_error('exclusive_minimum')]
271 else:
272 if value < self.minimum:
273 errors += [self.make_error('minimum')]
274 if self.maximum is not None:
275 if self.exclusive_maximum:
276 if value >= self.maximum:
277 errors += [self.make_error('exclusive_maximum')]
278 else:
279 if value > self.maximum:
280 errors += [self.make_error('maximum')]
281 if self.multiple_of is not None:
282 if isinstance(self.multiple_of, float):
283 failed = not (float(value) / self.multiple_of).is_integer()
284 else:
285 failed = value % self.multiple_of
286 if failed:
287 errors += [self.make_error('multiple_of')]
288 return errors
291class Integer(Number):
292 errors = {
293 'type': 'Must be an integer.',
294 'minimum': 'Must be greater than or equal to {minimum}.',
295 'exclusive_minimum': 'Must be greater than {minimum}.',
296 'maximum': 'Must be less than or equal to {maximum}.',
297 'exclusive_maximum': 'Must be less than {maximum}.',
298 'multiple_of': 'Must be a multiple of {multiple_of}.',
299 }
300 integer_only = True
303class String(Schema):
304 errors = {
305 'type': 'Must be a string.',
306 'blank': 'Must not be blank.',
307 'max_length': 'Must have no more than {max_length} characters.',
308 'min_length': 'Must have at least {min_length} characters.',
309 'pattern': 'Must match the pattern /{pattern}/.',
310 'format': 'Must be a valid {format}.',
311 }
313 def __init__(self, max_length=None, min_length=None, pattern=None, format=None, **kwargs):
314 super(String, self).__init__(**kwargs)
315 self.max_length = max_length
316 self.min_length = min_length
317 self.pattern = pattern
318 self.format = format
320 self.pattern_regex = None
321 if self.pattern is not None:
322 self.pattern_regex = re.compile(pattern)
324 def validate(self, value, context=None):
325 if not isinstance(value, text_types):
326 return [self.make_error('type')]
328 errors = []
329 if self.min_length is not None:
330 if len(value) < self.min_length:
331 if self.min_length == 1:
332 errors += [self.make_error('blank')]
333 else:
334 errors += [self.make_error('min_length')]
335 if self.max_length is not None:
336 if len(value) > self.max_length:
337 errors += [self.make_error('max_length')]
338 if self.pattern is not None:
339 if not re.search(self.pattern_regex, value):
340 errors += [self.make_error('pattern')]
341 if self.format is not None:
342 if not validate_format(value, self.format):
343 errors += [self.make_error('format')]
344 return errors
347class Boolean(Schema):
348 errors = {
349 'type': 'Must be a boolean.'
350 }
352 def validate(self, value, context=None):
353 if not isinstance(value, bool):
354 return [self.make_error('type')]
355 return []
358class Null(Schema):
359 errors = {
360 'type': 'Must be null.'
361 }
363 def validate(self, value, context=None):
364 if value is not None:
365 return [self.make_error('type')]
366 return []
369class Enum(Schema):
370 errors = {
371 'enum': 'Must be one of {enum}.',
372 'exact': 'Must be {exact}.',
373 }
375 def __init__(self, enum, **kwargs):
376 super(Enum, self).__init__(**kwargs)
378 self.enum = enum
379 if len(enum) == 1:
380 self.exact = repr(enum[0])
382 def validate(self, value, context=None):
383 if value not in self.enum:
384 if len(self.enum) == 1:
385 return [self.make_error('exact')]
386 return [self.make_error('enum')]
387 return []
390class Anything(Schema):
391 errors = {
392 'type': 'Must be a valid primitive type.'
393 }
394 types = text_types + (dict, list, int, float, bool, type(None))
396 def validate(self, value, context=None):
397 if not isinstance(value, self.types):
398 return [self.make_error('type')]
400 errors = []
401 if isinstance(value, list):
402 schema = Array()
403 errors += schema.validate(value, context)
404 elif isinstance(value, dict):
405 schema = Object()
406 errors += schema.validate(value, context)
407 return errors
410# Composites
412class Union(Schema):
413 errors = {
414 'match': 'Must match one of the options.'
415 }
417 def __init__(self, children, **kwargs):
418 super(Union, self).__init__(**kwargs)
420 self.children = children
422 def validate(self, value, context=None):
423 for child in self.children:
424 if child.validate(value, context) == []:
425 return []
426 return [self.make_error('match')]
429class Intersection(Schema):
430 def __init__(self, children, **kwargs):
431 super(Intersection, self).__init__(**kwargs)
432 self.children = children
434 def validate(self, value, context=None):
435 errors = []
436 for child in self.children:
437 errors.extend(child.validate(value, context))
438 return errors
441class ExclusiveUnion(Schema):
442 errors = {
443 'match': 'Must match one of the options.',
444 'match_only_one': 'Must match only one of the options.'
445 }
447 def __init__(self, children, **kwargs):
448 super(ExclusiveUnion, self).__init__(**kwargs)
450 self.children = children
452 def validate(self, value, context=None):
453 matches = 0
454 for child in self.children:
455 if child.validate(value, context) == []:
456 matches += 1
457 if not matches:
458 return [self.make_error('match')]
459 elif matches > 1:
460 return [self.make_error('match_only_one')]
461 return []
464class Not(Schema):
465 errors = {
466 'must_not_match': 'Must not match the option.'
467 }
469 def __init__(self, child, **kwargs):
470 super(Not, self).__init__(**kwargs)
471 self.child = child
473 def validate(self, value, context=None):
474 errors = []
475 if self.child.validate(value, context):
476 return []
477 return [self.make_error('must_not_match')]
480# References
482class Ref(Schema):
483 def __init__(self, ref_name):
484 self.ref_name = ref_name
486 def dereference(self, context):
487 assert isinstance(context, dict)
488 assert 'refs' in context
489 assert self.ref_name in context['refs']
490 return context['refs'][self.ref_name]
492 def validate(self, value, context=None):
493 schema = self.dereference(context)
494 return schema.validate(value, context)
497class RefSpace(Schema):
498 def __init__(self, refs, root):
499 assert root in refs
500 self.refs = refs
501 self.root = root
502 self.root_validator = refs[root]
504 def validate(self, value):
505 context = {'refs': self.refs}
506 return self.root_validator.validate(value, context)