Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreschema/schemas.py: 20%

314 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from collections import namedtuple 

2from coreschema.compat import text_types, numeric_types 

3from coreschema.formats import validate_format 

4from coreschema.utils import uniq 

5import re 

6 

7 

8Error = namedtuple('Error', ['text', 'index']) 

9 

10 

11def push_index(errors, key): 

12 return [ 

13 Error(error.text, [key] + error.index) 

14 for error in errors 

15 ] 

16 

17 

18# TODO: Properties as OrderedDict if from list of tuples. 

19# TODO: null keyword / Nullable 

20# TODO: dependancies 

21# TODO: remote ref 

22# TODO: remaining formats 

23# LATER: Enum display values 

24# LATER: File 

25# LATER: strict, coerce float etc... 

26# LATER: decimals 

27# LATER: override errors 

28 

29 

30class Schema(object): 

31 errors = {} 

32 

33 def __init__(self, title='', description='', default=None): 

34 self.title = title 

35 self.description = description 

36 self.default = default 

37 

38 def make_error(self, code): 

39 error_string = self.errors[code] 

40 params = self.__dict__ 

41 return Error(error_string.format(**params), []) 

42 

43 def __or__(self, other): 

44 if isinstance(self, Union): 

45 self_children = self.children 

46 else: 

47 self_children = [self] 

48 

49 if isinstance(other, Union): 

50 other_children = other.children 

51 else: 

52 other_children = [other] 

53 

54 return Union(self_children + other_children) 

55 

56 def __and__(self, other): 

57 if isinstance(self, Intersection): 

58 self_children = self.children 

59 else: 

60 self_children = [self] 

61 

62 if isinstance(other, Intersection): 

63 other_children = other.children 

64 else: 

65 other_children = [other] 

66 

67 return Intersection(self_children + other_children) 

68 

69 def __xor__(self, other): 

70 return ExclusiveUnion([self, other]) 

71 

72 def __invert__(self): 

73 return Not(self) 

74 

75 def __eq__(self, other): 

76 return ( 

77 self.__class__ == other.__class__ and 

78 self.__dict__ == other.__dict__ 

79 ) 

80 

81 

82class Object(Schema): 

83 errors = { 

84 'type': 'Must be an object.', 

85 'invalid_key': 'Object keys must be strings.', 

86 'empty': 'Must not be empty.', 

87 'required': 'This field is required.', 

88 'max_properties': 'Must have no more than {max_properties} properties.', 

89 'min_properties': 'Must have at least {min_properties} properties.', 

90 'invalid_property': 'Invalid property.' 

91 } 

92 

93 def __init__(self, properties=None, required=None, max_properties=None, min_properties=None, pattern_properties=None, additional_properties=True, **kwargs): 

94 super(Object, self).__init__(**kwargs) 

95 

96 if isinstance(additional_properties, bool): 

97 # Handle `additional_properties` set to a boolean. 

98 self.additional_properties_schema = Anything() 

99 else: 

100 # Handle `additional_properties` set to a schema. 

101 self.additional_properties_schema = additional_properties 

102 additional_properties = True 

103 

104 self.properties = properties 

105 self.required = required or [] 

106 self.max_properties = max_properties 

107 self.min_properties = min_properties 

108 self.pattern_properties = pattern_properties 

109 self.additional_properties = additional_properties 

110 

111 # Compile pattern regexes. 

112 self.pattern_properties_regex = None 

113 if pattern_properties is not None: 

114 self.pattern_properties_regex = { 

115 re.compile(key): value 

116 for key, value 

117 in pattern_properties.items() 

118 } 

119 

120 def validate(self, value, context=None): 

121 if not isinstance(value, dict): 

122 return [self.make_error('type')] 

123 

124 errors = [] 

125 if any(not isinstance(key, text_types) for key in value.keys()): 

126 errors += [self.make_error('invalid_key')] 

127 if self.required is not None: 

128 for key in self.required: 

129 if key not in value: 

130 error_items = [self.make_error('required')] 

131 errors += push_index(error_items, key) 

132 if self.min_properties is not None: 

133 if len(value) < self.min_properties: 

134 if self.min_properties == 1: 

135 errors += [self.make_error('empty')] 

136 else: 

137 errors += [self.make_error('min_properties')] 

138 if self.max_properties is not None: 

139 if len(value) > self.max_properties: 

140 errors += [self.make_error('max_properties')] 

141 

142 # Properties 

143 remaining_keys = set(value.keys()) 

144 if self.properties is not None: 

145 remaining_keys -= set(self.properties.keys()) 

146 for key, property_item in self.properties.items(): 

147 if key not in value: 

148 continue 

149 error_items = property_item.validate(value[key], context) 

150 errors += push_index(error_items, key) 

151 

152 # Pattern properties 

153 if self.pattern_properties is not None: 

154 for key in list(remaining_keys): 

155 for pattern, schema in self.pattern_properties_regex.items(): 

156 if re.search(pattern, key): 

157 error_items = schema.validate(value[key], context) 

158 errors += push_index(error_items, key) 

159 remaining_keys.discard(key) 

160 

161 # Additional properties 

162 if self.additional_properties: 

163 for key in remaining_keys: 

164 error_items = self.additional_properties_schema.validate(value[key], context) 

165 errors += push_index(error_items, key) 

166 else: 

167 for key in remaining_keys: 

168 error_items = [self.make_error('invalid_property')] 

169 errors += push_index(error_items, key) 

170 

171 return errors 

172 

173 

174class Array(Schema): 

175 errors = { 

176 'type': 'Must be an array.', 

177 'empty': 'Must not be empty.', 

178 'max_items': 'Must have no more than {max_items} items.', 

179 'min_items': 'Must have at least {min_items} items.', 

180 'unique': 'Must not contain duplicate items.' 

181 } 

182 

183 def __init__(self, items=None, max_items=None, min_items=None, unique_items=False, additional_items=True, **kwargs): 

184 super(Array, self).__init__(**kwargs) 

185 

186 if items is None: 

187 items = Anything() 

188 

189 if isinstance(items, list) and additional_items is False: 

190 # Setting additional_items==False implies a value for max_items. 

191 if max_items is None or max_items > len(items): 

192 max_items = len(items) 

193 

194 self.items = items 

195 self.max_items = max_items 

196 self.min_items = min_items 

197 self.unique_items = unique_items 

198 self.additional_items = additional_items 

199 

200 def validate(self, value, context=None): 

201 if not isinstance(value, list): 

202 return [self.make_error('type')] 

203 

204 errors = [] 

205 if self.items is not None: 

206 child_schema = self.items 

207 is_list = isinstance(self.items, list) 

208 for idx, item in enumerate(value): 

209 if is_list: 

210 # Case where `items` is a list of schemas. 

211 if idx < len(self.items): 

212 # Handle each item in the list. 

213 child_schema = self.items[idx] 

214 else: 

215 # Handle any additional items. 

216 if isinstance(self.additional_items, bool): 

217 break 

218 else: 

219 child_schema = self.additional_items 

220 error_items = child_schema.validate(item, context) 

221 errors += push_index(error_items, idx) 

222 if self.min_items is not None: 

223 if len(value) < self.min_items: 

224 if self.min_items == 1: 

225 errors += [self.make_error('empty')] 

226 else: 

227 errors += [self.make_error('min_items')] 

228 if self.max_items is not None: 

229 if len(value) > self.max_items: 

230 errors += [self.make_error('max_items')] 

231 if self.unique_items: 

232 if not(uniq(value)): 

233 errors += [self.make_error('unique')] 

234 

235 return errors 

236 

237 

238class Number(Schema): 

239 integer_only = False 

240 errors = { 

241 'type': 'Must be a number.', 

242 'minimum': 'Must be greater than or equal to {minimum}.', 

243 'exclusive_minimum': 'Must be greater than {minimum}.', 

244 'maximum': 'Must be less than or equal to {maximum}.', 

245 'exclusive_maximum': 'Must be less than {maximum}.', 

246 'multiple_of': 'Must be a multiple of {multiple_of}.', 

247 } 

248 

249 def __init__(self, minimum=None, maximum=None, exclusive_minimum=False, exclusive_maximum=False, multiple_of=None, **kwargs): 

250 super(Number, self).__init__(**kwargs) 

251 self.minimum = minimum 

252 self.maximum = maximum 

253 self.exclusive_minimum = exclusive_minimum 

254 self.exclusive_maximum = exclusive_maximum 

255 self.multiple_of = multiple_of 

256 

257 def validate(self, value, context=None): 

258 if isinstance(value, bool): 

259 # In Python `bool` subclasses `int`, so handle that case explicitly. 

260 return [self.make_error('type')] 

261 if not isinstance(value, numeric_types): 

262 return [self.make_error('type')] 

263 if self.integer_only and isinstance(value, float) and not value.is_integer(): 

264 return [self.make_error('type')] 

265 

266 errors = [] 

267 if self.minimum is not None: 

268 if self.exclusive_minimum: 

269 if value <= self.minimum: 

270 errors += [self.make_error('exclusive_minimum')] 

271 else: 

272 if value < self.minimum: 

273 errors += [self.make_error('minimum')] 

274 if self.maximum is not None: 

275 if self.exclusive_maximum: 

276 if value >= self.maximum: 

277 errors += [self.make_error('exclusive_maximum')] 

278 else: 

279 if value > self.maximum: 

280 errors += [self.make_error('maximum')] 

281 if self.multiple_of is not None: 

282 if isinstance(self.multiple_of, float): 

283 failed = not (float(value) / self.multiple_of).is_integer() 

284 else: 

285 failed = value % self.multiple_of 

286 if failed: 

287 errors += [self.make_error('multiple_of')] 

288 return errors 

289 

290 

291class Integer(Number): 

292 errors = { 

293 'type': 'Must be an integer.', 

294 'minimum': 'Must be greater than or equal to {minimum}.', 

295 'exclusive_minimum': 'Must be greater than {minimum}.', 

296 'maximum': 'Must be less than or equal to {maximum}.', 

297 'exclusive_maximum': 'Must be less than {maximum}.', 

298 'multiple_of': 'Must be a multiple of {multiple_of}.', 

299 } 

300 integer_only = True 

301 

302 

303class String(Schema): 

304 errors = { 

305 'type': 'Must be a string.', 

306 'blank': 'Must not be blank.', 

307 'max_length': 'Must have no more than {max_length} characters.', 

308 'min_length': 'Must have at least {min_length} characters.', 

309 'pattern': 'Must match the pattern /{pattern}/.', 

310 'format': 'Must be a valid {format}.', 

311 } 

312 

313 def __init__(self, max_length=None, min_length=None, pattern=None, format=None, **kwargs): 

314 super(String, self).__init__(**kwargs) 

315 self.max_length = max_length 

316 self.min_length = min_length 

317 self.pattern = pattern 

318 self.format = format 

319 

320 self.pattern_regex = None 

321 if self.pattern is not None: 

322 self.pattern_regex = re.compile(pattern) 

323 

324 def validate(self, value, context=None): 

325 if not isinstance(value, text_types): 

326 return [self.make_error('type')] 

327 

328 errors = [] 

329 if self.min_length is not None: 

330 if len(value) < self.min_length: 

331 if self.min_length == 1: 

332 errors += [self.make_error('blank')] 

333 else: 

334 errors += [self.make_error('min_length')] 

335 if self.max_length is not None: 

336 if len(value) > self.max_length: 

337 errors += [self.make_error('max_length')] 

338 if self.pattern is not None: 

339 if not re.search(self.pattern_regex, value): 

340 errors += [self.make_error('pattern')] 

341 if self.format is not None: 

342 if not validate_format(value, self.format): 

343 errors += [self.make_error('format')] 

344 return errors 

345 

346 

347class Boolean(Schema): 

348 errors = { 

349 'type': 'Must be a boolean.' 

350 } 

351 

352 def validate(self, value, context=None): 

353 if not isinstance(value, bool): 

354 return [self.make_error('type')] 

355 return [] 

356 

357 

358class Null(Schema): 

359 errors = { 

360 'type': 'Must be null.' 

361 } 

362 

363 def validate(self, value, context=None): 

364 if value is not None: 

365 return [self.make_error('type')] 

366 return [] 

367 

368 

369class Enum(Schema): 

370 errors = { 

371 'enum': 'Must be one of {enum}.', 

372 'exact': 'Must be {exact}.', 

373 } 

374 

375 def __init__(self, enum, **kwargs): 

376 super(Enum, self).__init__(**kwargs) 

377 

378 self.enum = enum 

379 if len(enum) == 1: 

380 self.exact = repr(enum[0]) 

381 

382 def validate(self, value, context=None): 

383 if value not in self.enum: 

384 if len(self.enum) == 1: 

385 return [self.make_error('exact')] 

386 return [self.make_error('enum')] 

387 return [] 

388 

389 

390class Anything(Schema): 

391 errors = { 

392 'type': 'Must be a valid primitive type.' 

393 } 

394 types = text_types + (dict, list, int, float, bool, type(None)) 

395 

396 def validate(self, value, context=None): 

397 if not isinstance(value, self.types): 

398 return [self.make_error('type')] 

399 

400 errors = [] 

401 if isinstance(value, list): 

402 schema = Array() 

403 errors += schema.validate(value, context) 

404 elif isinstance(value, dict): 

405 schema = Object() 

406 errors += schema.validate(value, context) 

407 return errors 

408 

409 

410# Composites 

411 

412class Union(Schema): 

413 errors = { 

414 'match': 'Must match one of the options.' 

415 } 

416 

417 def __init__(self, children, **kwargs): 

418 super(Union, self).__init__(**kwargs) 

419 

420 self.children = children 

421 

422 def validate(self, value, context=None): 

423 for child in self.children: 

424 if child.validate(value, context) == []: 

425 return [] 

426 return [self.make_error('match')] 

427 

428 

429class Intersection(Schema): 

430 def __init__(self, children, **kwargs): 

431 super(Intersection, self).__init__(**kwargs) 

432 self.children = children 

433 

434 def validate(self, value, context=None): 

435 errors = [] 

436 for child in self.children: 

437 errors.extend(child.validate(value, context)) 

438 return errors 

439 

440 

441class ExclusiveUnion(Schema): 

442 errors = { 

443 'match': 'Must match one of the options.', 

444 'match_only_one': 'Must match only one of the options.' 

445 } 

446 

447 def __init__(self, children, **kwargs): 

448 super(ExclusiveUnion, self).__init__(**kwargs) 

449 

450 self.children = children 

451 

452 def validate(self, value, context=None): 

453 matches = 0 

454 for child in self.children: 

455 if child.validate(value, context) == []: 

456 matches += 1 

457 if not matches: 

458 return [self.make_error('match')] 

459 elif matches > 1: 

460 return [self.make_error('match_only_one')] 

461 return [] 

462 

463 

464class Not(Schema): 

465 errors = { 

466 'must_not_match': 'Must not match the option.' 

467 } 

468 

469 def __init__(self, child, **kwargs): 

470 super(Not, self).__init__(**kwargs) 

471 self.child = child 

472 

473 def validate(self, value, context=None): 

474 errors = [] 

475 if self.child.validate(value, context): 

476 return [] 

477 return [self.make_error('must_not_match')] 

478 

479 

480# References 

481 

482class Ref(Schema): 

483 def __init__(self, ref_name): 

484 self.ref_name = ref_name 

485 

486 def dereference(self, context): 

487 assert isinstance(context, dict) 

488 assert 'refs' in context 

489 assert self.ref_name in context['refs'] 

490 return context['refs'][self.ref_name] 

491 

492 def validate(self, value, context=None): 

493 schema = self.dereference(context) 

494 return schema.validate(value, context) 

495 

496 

497class RefSpace(Schema): 

498 def __init__(self, refs, root): 

499 assert root in refs 

500 self.refs = refs 

501 self.root = root 

502 self.root_validator = refs[root] 

503 

504 def validate(self, value): 

505 context = {'refs': self.refs} 

506 return self.root_validator.validate(value, context)