Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/schema.py: 44%

462 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""schema is a library for validating Python data structures, such as those 

2obtained from config-files, forms, external services or command-line 

3parsing, converted from JSON/YAML (or something else) to Python data-types.""" 

4 

5import inspect 

6import re 

7 

8try: 

9 from contextlib import ExitStack 

10except ImportError: 

11 from contextlib2 import ExitStack 

12 

13 

14__version__ = "0.7.5" 

15__all__ = [ 

16 "Schema", 

17 "And", 

18 "Or", 

19 "Regex", 

20 "Optional", 

21 "Use", 

22 "Forbidden", 

23 "Const", 

24 "Literal", 

25 "SchemaError", 

26 "SchemaWrongKeyError", 

27 "SchemaMissingKeyError", 

28 "SchemaForbiddenKeyError", 

29 "SchemaUnexpectedTypeError", 

30 "SchemaOnlyOneAllowedError", 

31] 

32 

33 

34class SchemaError(Exception): 

35 """Error during Schema validation.""" 

36 

37 def __init__(self, autos, errors=None): 

38 self.autos = autos if type(autos) is list else [autos] 

39 self.errors = errors if type(errors) is list else [errors] 

40 Exception.__init__(self, self.code) 

41 

42 @property 

43 def code(self): 

44 """ 

45 Removes duplicates values in auto and error list. 

46 parameters. 

47 """ 

48 

49 def uniq(seq): 

50 """ 

51 Utility function that removes duplicate. 

52 """ 

53 seen = set() 

54 seen_add = seen.add 

55 # This way removes duplicates while preserving the order. 

56 return [x for x in seq if x not in seen and not seen_add(x)] 

57 

58 data_set = uniq(i for i in self.autos if i is not None) 

59 error_list = uniq(i for i in self.errors if i is not None) 

60 if error_list: 60 ↛ 61line 60 didn't jump to line 61, because the condition on line 60 was never true

61 return "\n".join(error_list) 

62 return "\n".join(data_set) 

63 

64 

65class SchemaWrongKeyError(SchemaError): 

66 """Error Should be raised when an unexpected key is detected within the 

67 data set being.""" 

68 

69 pass 

70 

71 

72class SchemaMissingKeyError(SchemaError): 

73 """Error should be raised when a mandatory key is not found within the 

74 data set being validated""" 

75 

76 pass 

77 

78 

79class SchemaOnlyOneAllowedError(SchemaError): 

80 """Error should be raised when an only_one Or key has multiple matching candidates""" 

81 

82 pass 

83 

84 

85class SchemaForbiddenKeyError(SchemaError): 

86 """Error should be raised when a forbidden key is found within the 

87 data set being validated, and its value matches the value that was specified""" 

88 

89 pass 

90 

91 

92class SchemaUnexpectedTypeError(SchemaError): 

93 """Error should be raised when a type mismatch is detected within the 

94 data set being validated.""" 

95 

96 pass 

97 

98 

99class And(object): 

100 """ 

101 Utility function to combine validation directives in AND Boolean fashion. 

102 """ 

103 

104 def __init__(self, *args, **kw): 

105 self._args = args 

106 if not set(kw).issubset({"error", "schema", "ignore_extra_keys"}): 106 ↛ 107line 106 didn't jump to line 107, because the condition on line 106 was never true

107 diff = {"error", "schema", "ignore_extra_keys"}.difference(kw) 

108 raise TypeError("Unknown keyword arguments %r" % list(diff)) 

109 self._error = kw.get("error") 

110 self._ignore_extra_keys = kw.get("ignore_extra_keys", False) 

111 # You can pass your inherited Schema class. 

112 self._schema = kw.get("schema", Schema) 

113 

114 def __repr__(self): 

115 return "%s(%s)" % (self.__class__.__name__, ", ".join(repr(a) for a in self._args)) 

116 

117 @property 

118 def args(self): 

119 """The provided parameters""" 

120 return self._args 

121 

122 def validate(self, data, **kwargs): 

123 """ 

124 Validate data using defined sub schema/expressions ensuring all 

125 values are valid. 

126 :param data: to be validated with sub defined schemas. 

127 :return: returns validated data 

128 """ 

129 for s in [self._schema(s, error=self._error, ignore_extra_keys=self._ignore_extra_keys) for s in self._args]: 

130 data = s.validate(data, **kwargs) 

131 return data 

132 

133 

134class Or(And): 

135 """Utility function to combine validation directives in a OR Boolean 

136 fashion.""" 

137 

138 def __init__(self, *args, **kwargs): 

139 self.only_one = kwargs.pop("only_one", False) 

140 self.match_count = 0 

141 super(Or, self).__init__(*args, **kwargs) 

142 

143 def reset(self): 

144 failed = self.match_count > 1 and self.only_one 

145 self.match_count = 0 

146 if failed: 146 ↛ 147line 146 didn't jump to line 147, because the condition on line 146 was never true

147 raise SchemaOnlyOneAllowedError(["There are multiple keys present " + "from the %r condition" % self]) 

148 

149 def validate(self, data, **kwargs): 

150 """ 

151 Validate data using sub defined schema/expressions ensuring at least 

152 one value is valid. 

153 :param data: data to be validated by provided schema. 

154 :return: return validated data if not validation 

155 """ 

156 autos, errors = [], [] 

157 for s in [self._schema(s, error=self._error, ignore_extra_keys=self._ignore_extra_keys) for s in self._args]: 157 ↛ 167line 157 didn't jump to line 167, because the loop on line 157 didn't complete

158 try: 

159 validation = s.validate(data, **kwargs) 

160 self.match_count += 1 

161 if self.match_count > 1 and self.only_one: 161 ↛ 162line 161 didn't jump to line 162, because the condition on line 161 was never true

162 break 

163 return validation 

164 except SchemaError as _x: 

165 autos += _x.autos 

166 errors += _x.errors 

167 raise SchemaError( 

168 ["%r did not validate %r" % (self, data)] + autos, 

169 [self._error.format(data) if self._error else None] + errors, 

170 ) 

171 

172 

173class Regex(object): 

174 """ 

175 Enables schema.py to validate string using regular expressions. 

176 """ 

177 

178 # Map all flags bits to a more readable description 

179 NAMES = [ 

180 "re.ASCII", 

181 "re.DEBUG", 

182 "re.VERBOSE", 

183 "re.UNICODE", 

184 "re.DOTALL", 

185 "re.MULTILINE", 

186 "re.LOCALE", 

187 "re.IGNORECASE", 

188 "re.TEMPLATE", 

189 ] 

190 

191 def __init__(self, pattern_str, flags=0, error=None): 

192 self._pattern_str = pattern_str 

193 flags_list = [ 

194 Regex.NAMES[i] for i, f in enumerate("{0:09b}".format(int(flags))) if f != "0" 

195 ] # Name for each bit 

196 

197 if flags_list: 

198 self._flags_names = ", flags=" + "|".join(flags_list) 

199 else: 

200 self._flags_names = "" 

201 

202 self._pattern = re.compile(pattern_str, flags=flags) 

203 self._error = error 

204 

205 def __repr__(self): 

206 return "%s(%r%s)" % (self.__class__.__name__, self._pattern_str, self._flags_names) 

207 

208 @property 

209 def pattern_str(self): 

210 """The pattern for the represented regular expression""" 

211 return self._pattern_str 

212 

213 def validate(self, data, **kwargs): 

214 """ 

215 Validated data using defined regex. 

216 :param data: data to be validated 

217 :return: return validated data. 

218 """ 

219 e = self._error 

220 

221 try: 

222 if self._pattern.search(data): 

223 return data 

224 else: 

225 raise SchemaError("%r does not match %r" % (self, data), e.format(data) if e else None) 

226 except TypeError: 

227 raise SchemaError("%r is not string nor buffer" % data, e) 

228 

229 

230class Use(object): 

231 """ 

232 For more general use cases, you can use the Use class to transform 

233 the data while it is being validate. 

234 """ 

235 

236 def __init__(self, callable_, error=None): 

237 if not callable(callable_): 

238 raise TypeError("Expected a callable, not %r" % callable_) 

239 self._callable = callable_ 

240 self._error = error 

241 

242 def __repr__(self): 

243 return "%s(%r)" % (self.__class__.__name__, self._callable) 

244 

245 def validate(self, data, **kwargs): 

246 try: 

247 return self._callable(data) 

248 except SchemaError as x: 

249 raise SchemaError([None] + x.autos, [self._error.format(data) if self._error else None] + x.errors) 

250 except BaseException as x: 

251 f = _callable_str(self._callable) 

252 raise SchemaError("%s(%r) raised %r" % (f, data, x), self._error.format(data) if self._error else None) 

253 

254 

255COMPARABLE, CALLABLE, VALIDATOR, TYPE, DICT, ITERABLE = range(6) 

256 

257 

258def _priority(s): 

259 """Return priority for a given object.""" 

260 if type(s) in (list, tuple, set, frozenset): 

261 return ITERABLE 

262 if type(s) is dict: 

263 return DICT 

264 if issubclass(type(s), type): 

265 return TYPE 

266 if isinstance(s, Literal): 266 ↛ 267line 266 didn't jump to line 267, because the condition on line 266 was never true

267 return COMPARABLE 

268 if hasattr(s, "validate"): 

269 return VALIDATOR 

270 if callable(s): 270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true

271 return CALLABLE 

272 else: 

273 return COMPARABLE 

274 

275 

276def _invoke_with_optional_kwargs(f, **kwargs): 

277 s = inspect.signature(f) 

278 if len(s.parameters) == 0: 

279 return f() 

280 return f(**kwargs) 

281 

282 

283class Schema(object): 

284 """ 

285 Entry point of the library, use this class to instantiate validation 

286 schema for the data that will be validated. 

287 """ 

288 

289 def __init__(self, schema, error=None, ignore_extra_keys=False, name=None, description=None, as_reference=False): 

290 self._schema = schema 

291 self._error = error 

292 self._ignore_extra_keys = ignore_extra_keys 

293 self._name = name 

294 self._description = description 

295 # Ask json_schema to create a definition for this schema and use it as part of another 

296 self.as_reference = as_reference 

297 if as_reference and name is None: 297 ↛ 298line 297 didn't jump to line 298, because the condition on line 297 was never true

298 raise ValueError("Schema used as reference should have a name") 

299 

300 def __repr__(self): 

301 return "%s(%r)" % (self.__class__.__name__, self._schema) 

302 

303 @property 

304 def schema(self): 

305 return self._schema 

306 

307 @property 

308 def description(self): 

309 return self._description 

310 

311 @property 

312 def name(self): 

313 return self._name 

314 

315 @property 

316 def ignore_extra_keys(self): 

317 return self._ignore_extra_keys 

318 

319 @staticmethod 

320 def _dict_key_priority(s): 

321 """Return priority for a given key object.""" 

322 if isinstance(s, Hook): 322 ↛ 323line 322 didn't jump to line 323, because the condition on line 322 was never true

323 return _priority(s._schema) - 0.5 

324 if isinstance(s, Optional): 

325 return _priority(s._schema) + 0.5 

326 return _priority(s) 

327 

328 @staticmethod 

329 def _is_optional_type(s): 

330 """Return True if the given key is optional (does not have to be found)""" 

331 return any(isinstance(s, optional_type) for optional_type in [Optional, Hook]) 

332 

333 def is_valid(self, data, **kwargs): 

334 """Return whether the given data has passed all the validations 

335 that were specified in the given schema. 

336 """ 

337 try: 

338 self.validate(data, **kwargs) 

339 except SchemaError: 

340 return False 

341 else: 

342 return True 

343 

344 def _prepend_schema_name(self, message): 

345 """ 

346 If a custom schema name has been defined, prepends it to the error 

347 message that gets raised when a schema error occurs. 

348 """ 

349 if self._name: 349 ↛ 350line 349 didn't jump to line 350, because the condition on line 349 was never true

350 message = "{0!r} {1!s}".format(self._name, message) 

351 return message 

352 

353 def validate(self, data, **kwargs): 

354 Schema = self.__class__ 

355 s = self._schema 

356 e = self._error 

357 i = self._ignore_extra_keys 

358 

359 if isinstance(s, Literal): 359 ↛ 360line 359 didn't jump to line 360, because the condition on line 359 was never true

360 s = s.schema 

361 

362 flavor = _priority(s) 

363 if flavor == ITERABLE: 

364 data = Schema(type(s), error=e).validate(data, **kwargs) 

365 o = Or(*s, error=e, schema=Schema, ignore_extra_keys=i) 

366 return type(data)(o.validate(d, **kwargs) for d in data) 

367 if flavor == DICT: 

368 exitstack = ExitStack() 

369 data = Schema(dict, error=e).validate(data, **kwargs) 

370 new = type(data)() # new - is a dict of the validated values 

371 coverage = set() # matched schema keys 

372 # for each key and value find a schema entry matching them, if any 

373 sorted_skeys = sorted(s, key=self._dict_key_priority) 

374 for skey in sorted_skeys: 

375 if hasattr(skey, "reset"): 

376 exitstack.callback(skey.reset) 

377 

378 with exitstack: 

379 # Evaluate dictionaries last 

380 data_items = sorted(data.items(), key=lambda value: isinstance(value[1], dict)) 

381 for key, value in data_items: 

382 for skey in sorted_skeys: 

383 svalue = s[skey] 

384 try: 

385 nkey = Schema(skey, error=e).validate(key, **kwargs) 

386 except SchemaError: 

387 pass 

388 else: 

389 if isinstance(skey, Hook): 389 ↛ 398line 389 didn't jump to line 398, because the condition on line 389 was never true

390 # As the content of the value makes little sense for 

391 # keys with a hook, we reverse its meaning: 

392 # we will only call the handler if the value does match 

393 # In the case of the forbidden key hook, 

394 # we will raise the SchemaErrorForbiddenKey exception 

395 # on match, allowing for excluding a key only if its 

396 # value has a certain type, and allowing Forbidden to 

397 # work well in combination with Optional. 

398 try: 

399 nvalue = Schema(svalue, error=e).validate(value, **kwargs) 

400 except SchemaError: 

401 continue 

402 skey.handler(nkey, data, e) 

403 else: 

404 try: 

405 nvalue = Schema(svalue, error=e, ignore_extra_keys=i).validate(value, **kwargs) 

406 except SchemaError as x: 

407 k = "Key '%s' error:" % nkey 

408 message = self._prepend_schema_name(k) 

409 raise SchemaError([message] + x.autos, [e.format(data) if e else None] + x.errors) 

410 else: 

411 new[nkey] = nvalue 

412 coverage.add(skey) 

413 break 

414 required = set(k for k in s if not self._is_optional_type(k)) 

415 if not required.issubset(coverage): 415 ↛ 416line 415 didn't jump to line 416, because the condition on line 415 was never true

416 missing_keys = required - coverage 

417 s_missing_keys = ", ".join(repr(k) for k in sorted(missing_keys, key=repr)) 

418 message = "Missing key%s: %s" % (_plural_s(missing_keys), s_missing_keys) 

419 message = self._prepend_schema_name(message) 

420 raise SchemaMissingKeyError(message, e.format(data) if e else None) 

421 if not self._ignore_extra_keys and (len(new) != len(data)): 

422 wrong_keys = set(data.keys()) - set(new.keys()) 

423 s_wrong_keys = ", ".join(repr(k) for k in sorted(wrong_keys, key=repr)) 

424 message = "Wrong key%s %s in %r" % (_plural_s(wrong_keys), s_wrong_keys, data) 

425 message = self._prepend_schema_name(message) 

426 raise SchemaWrongKeyError(message, e.format(data) if e else None) 

427 

428 # Apply default-having optionals that haven't been used: 

429 defaults = set(k for k in s if isinstance(k, Optional) and hasattr(k, "default")) - coverage 

430 for default in defaults: 430 ↛ 431line 430 didn't jump to line 431, because the loop on line 430 never started

431 new[default.key] = _invoke_with_optional_kwargs(default.default, **kwargs) if callable(default.default) else default.default 

432 

433 return new 

434 if flavor == TYPE: 

435 if isinstance(data, s) and not (isinstance(data, bool) and s == int): 

436 return data 

437 else: 

438 message = "%r should be instance of %r" % (data, s.__name__) 

439 message = self._prepend_schema_name(message) 

440 raise SchemaUnexpectedTypeError(message, e.format(data) if e else None) 

441 if flavor == VALIDATOR: 

442 try: 

443 return s.validate(data, **kwargs) 

444 except SchemaError as x: 444 ↛ 446line 444 didn't jump to line 446

445 raise SchemaError([None] + x.autos, [e.format(data) if e else None] + x.errors) 

446 except BaseException as x: 

447 message = "%r.validate(%r) raised %r" % (s, data, x) 

448 message = self._prepend_schema_name(message) 

449 raise SchemaError(message, e.format(data) if e else None) 

450 if flavor == CALLABLE: 450 ↛ 451line 450 didn't jump to line 451, because the condition on line 450 was never true

451 f = _callable_str(s) 

452 try: 

453 if s(data): 

454 return data 

455 except SchemaError as x: 

456 raise SchemaError([None] + x.autos, [e.format(data) if e else None] + x.errors) 

457 except BaseException as x: 

458 message = "%s(%r) raised %r" % (f, data, x) 

459 message = self._prepend_schema_name(message) 

460 raise SchemaError(message, e.format(data) if e else None) 

461 message = "%s(%r) should evaluate to True" % (f, data) 

462 message = self._prepend_schema_name(message) 

463 raise SchemaError(message, e.format(data) if e else None) 

464 if s == data: 

465 return data 

466 else: 

467 message = "%r does not match %r" % (s, data) 

468 message = self._prepend_schema_name(message) 

469 raise SchemaError(message, e.format(data) if e else None) 

470 

471 def json_schema(self, schema_id, use_refs=False, **kwargs): 

472 """Generate a draft-07 JSON schema dict representing the Schema. 

473 This method must be called with a schema_id. 

474 

475 :param schema_id: The value of the $id on the main schema 

476 :param use_refs: Enable reusing object references in the resulting JSON schema. 

477 Schemas with references are harder to read by humans, but are a lot smaller when there 

478 is a lot of reuse 

479 """ 

480 

481 seen = dict() # For use_refs 

482 definitions_by_name = {} 

483 

484 def _json_schema(schema, is_main_schema=True, description=None, allow_reference=True): 

485 Schema = self.__class__ 

486 

487 def _create_or_use_ref(return_dict): 

488 """If not already seen, return the provided part of the schema unchanged. 

489 If already seen, give an id to the already seen dict and return a reference to the previous part 

490 of the schema instead. 

491 """ 

492 if not use_refs or is_main_schema: 

493 return return_schema 

494 

495 hashed = hash(repr(sorted(return_dict.items()))) 

496 

497 if hashed not in seen: 

498 seen[hashed] = return_dict 

499 return return_dict 

500 else: 

501 id_str = "#" + str(hashed) 

502 seen[hashed]["$id"] = id_str 

503 return {"$ref": id_str} 

504 

505 def _get_type_name(python_type): 

506 """Return the JSON schema name for a Python type""" 

507 if python_type == str: 

508 return "string" 

509 elif python_type == int: 

510 return "integer" 

511 elif python_type == float: 

512 return "number" 

513 elif python_type == bool: 

514 return "boolean" 

515 elif python_type == list: 

516 return "array" 

517 elif python_type == dict: 

518 return "object" 

519 return "string" 

520 

521 def _to_json_type(value): 

522 """Attempt to convert a constant value (for "const" and "default") to a JSON serializable value""" 

523 if value is None or type(value) in (str, int, float, bool, list, dict): 

524 return value 

525 

526 if type(value) in (tuple, set, frozenset): 

527 return list(value) 

528 

529 if isinstance(value, Literal): 

530 return value.schema 

531 

532 return str(value) 

533 

534 def _to_schema(s, ignore_extra_keys): 

535 if not isinstance(s, Schema): 

536 return Schema(s, ignore_extra_keys=ignore_extra_keys) 

537 

538 return s 

539 

540 s = schema.schema 

541 i = schema.ignore_extra_keys 

542 flavor = _priority(s) 

543 

544 return_schema = {} 

545 

546 return_description = description or schema.description 

547 if return_description: 

548 return_schema["description"] = return_description 

549 

550 # Check if we have to create a common definition and use as reference 

551 if allow_reference and schema.as_reference: 

552 # Generate sub schema if not already done 

553 if schema.name not in definitions_by_name: 

554 definitions_by_name[schema.name] = {} # Avoid infinite loop 

555 definitions_by_name[schema.name] = _json_schema(schema, is_main_schema=False, allow_reference=False) 

556 

557 return_schema["$ref"] = "#/definitions/" + schema.name 

558 else: 

559 if flavor == TYPE: 

560 # Handle type 

561 return_schema["type"] = _get_type_name(s) 

562 elif flavor == ITERABLE: 

563 # Handle arrays or dict schema 

564 

565 return_schema["type"] = "array" 

566 if len(s) == 1: 

567 return_schema["items"] = _json_schema(_to_schema(s[0], i), is_main_schema=False) 

568 elif len(s) > 1: 

569 return_schema["items"] = _json_schema(Schema(Or(*s)), is_main_schema=False) 

570 elif isinstance(s, Or): 

571 # Handle Or values 

572 

573 # Check if we can use an enum 

574 if all(priority == COMPARABLE for priority in [_priority(value) for value in s.args]): 

575 or_values = [str(s) if isinstance(s, Literal) else s for s in s.args] 

576 # All values are simple, can use enum or const 

577 if len(or_values) == 1: 

578 return_schema["const"] = _to_json_type(or_values[0]) 

579 return return_schema 

580 return_schema["enum"] = or_values 

581 else: 

582 # No enum, let's go with recursive calls 

583 any_of_values = [] 

584 for or_key in s.args: 

585 new_value = _json_schema(_to_schema(or_key, i), is_main_schema=False) 

586 if new_value != {} and new_value not in any_of_values: 

587 any_of_values.append(new_value) 

588 if len(any_of_values) == 1: 

589 # Only one representable condition remains, do not put under anyOf 

590 return_schema.update(any_of_values[0]) 

591 else: 

592 return_schema["anyOf"] = any_of_values 

593 elif isinstance(s, And): 

594 # Handle And values 

595 all_of_values = [] 

596 for and_key in s.args: 

597 new_value = _json_schema(_to_schema(and_key, i), is_main_schema=False) 

598 if new_value != {} and new_value not in all_of_values: 

599 all_of_values.append(new_value) 

600 if len(all_of_values) == 1: 

601 # Only one representable condition remains, do not put under allOf 

602 return_schema.update(all_of_values[0]) 

603 else: 

604 return_schema["allOf"] = all_of_values 

605 elif flavor == COMPARABLE: 

606 return_schema["const"] = _to_json_type(s) 

607 elif flavor == VALIDATOR and type(s) == Regex: 

608 return_schema["type"] = "string" 

609 return_schema["pattern"] = s.pattern_str 

610 else: 

611 if flavor != DICT: 

612 # If not handled, do not check 

613 return return_schema 

614 

615 # Schema is a dict 

616 

617 required_keys = [] 

618 expanded_schema = {} 

619 additional_properties = i 

620 for key in s: 

621 if isinstance(key, Hook): 

622 continue 

623 

624 def _key_allows_additional_properties(key): 

625 """Check if a key is broad enough to allow additional properties""" 

626 if isinstance(key, Optional): 

627 return _key_allows_additional_properties(key.schema) 

628 

629 return key == str or key == object 

630 

631 def _get_key_description(key): 

632 """Get the description associated to a key (as specified in a Literal object). Return None if not a Literal""" 

633 if isinstance(key, Optional): 

634 return _get_key_description(key.schema) 

635 

636 if isinstance(key, Literal): 

637 return key.description 

638 

639 return None 

640 

641 def _get_key_name(key): 

642 """Get the name of a key (as specified in a Literal object). Return the key unchanged if not a Literal""" 

643 if isinstance(key, Optional): 

644 return _get_key_name(key.schema) 

645 

646 if isinstance(key, Literal): 

647 return key.schema 

648 

649 return key 

650 

651 additional_properties = additional_properties or _key_allows_additional_properties(key) 

652 sub_schema = _to_schema(s[key], ignore_extra_keys=i) 

653 key_name = _get_key_name(key) 

654 

655 if isinstance(key_name, str): 

656 if not isinstance(key, Optional): 

657 required_keys.append(key_name) 

658 expanded_schema[key_name] = _json_schema( 

659 sub_schema, is_main_schema=False, description=_get_key_description(key) 

660 ) 

661 if isinstance(key, Optional) and hasattr(key, "default"): 

662 expanded_schema[key_name]["default"] = _to_json_type(_invoke_with_optional_kwargs(key.default, **kwargs) if callable(key.default) else key.default) 

663 elif isinstance(key_name, Or): 

664 # JSON schema does not support having a key named one name or another, so we just add both options 

665 # This is less strict because we cannot enforce that one or the other is required 

666 

667 for or_key in key_name.args: 

668 expanded_schema[_get_key_name(or_key)] = _json_schema( 

669 sub_schema, is_main_schema=False, description=_get_key_description(or_key) 

670 ) 

671 

672 return_schema.update( 

673 { 

674 "type": "object", 

675 "properties": expanded_schema, 

676 "required": required_keys, 

677 "additionalProperties": additional_properties, 

678 } 

679 ) 

680 

681 if is_main_schema: 

682 return_schema.update({"$id": schema_id, "$schema": "http://json-schema.org/draft-07/schema#"}) 

683 if self._name: 

684 return_schema["title"] = self._name 

685 

686 if definitions_by_name: 

687 return_schema["definitions"] = {} 

688 for definition_name, definition in definitions_by_name.items(): 

689 return_schema["definitions"][definition_name] = definition 

690 

691 return _create_or_use_ref(return_schema) 

692 

693 return _json_schema(self, True) 

694 

695 

696class Optional(Schema): 

697 """Marker for an optional part of the validation Schema.""" 

698 

699 _MARKER = object() 

700 

701 def __init__(self, *args, **kwargs): 

702 default = kwargs.pop("default", self._MARKER) 

703 super(Optional, self).__init__(*args, **kwargs) 

704 if default is not self._MARKER: 704 ↛ 706line 704 didn't jump to line 706, because the condition on line 704 was never true

705 # See if I can come up with a static key to use for myself: 

706 if _priority(self._schema) != COMPARABLE: 

707 raise TypeError( 

708 "Optional keys with defaults must have simple, " 

709 "predictable values, like literal strings or ints. " 

710 '"%r" is too complex.' % (self._schema,) 

711 ) 

712 self.default = default 

713 self.key = str(self._schema) 

714 

715 def __hash__(self): 

716 return hash(self._schema) 

717 

718 def __eq__(self, other): 

719 return ( 

720 self.__class__ is other.__class__ 

721 and getattr(self, "default", self._MARKER) == getattr(other, "default", self._MARKER) 

722 and self._schema == other._schema 

723 ) 

724 

725 def reset(self): 

726 if hasattr(self._schema, "reset"): 726 ↛ 727line 726 didn't jump to line 727, because the condition on line 726 was never true

727 self._schema.reset() 

728 

729 

730class Hook(Schema): 

731 def __init__(self, *args, **kwargs): 

732 self.handler = kwargs.pop("handler", lambda *args: None) 

733 super(Hook, self).__init__(*args, **kwargs) 

734 self.key = self._schema 

735 

736 

737class Forbidden(Hook): 

738 def __init__(self, *args, **kwargs): 

739 kwargs["handler"] = self._default_function 

740 super(Forbidden, self).__init__(*args, **kwargs) 

741 

742 @staticmethod 

743 def _default_function(nkey, data, error): 

744 raise SchemaForbiddenKeyError("Forbidden key encountered: %r in %r" % (nkey, data), error) 

745 

746 

747class Literal(object): 

748 def __init__(self, value, description=None): 

749 self._schema = value 

750 self._description = description 

751 

752 def __str__(self): 

753 return self._schema 

754 

755 def __repr__(self): 

756 return 'Literal("' + self.schema + '", description="' + (self.description or "") + '")' 

757 

758 @property 

759 def description(self): 

760 return self._description 

761 

762 @property 

763 def schema(self): 

764 return self._schema 

765 

766 

767class Const(Schema): 

768 def validate(self, data, **kwargs): 

769 super(Const, self).validate(data, **kwargs) 

770 return data 

771 

772 

773def _callable_str(callable_): 

774 if hasattr(callable_, "__name__"): 

775 return callable_.__name__ 

776 return str(callable_) 

777 

778 

779def _plural_s(sized): 

780 return "s" if len(sized) > 1 else ""