Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/fields/json.py: 48%

341 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import json 

2 

3from django import forms 

4from django.core import checks, exceptions 

5from django.db import NotSupportedError, connections, router 

6from django.db.models import lookups 

7from django.db.models.lookups import PostgresOperatorLookup, Transform 

8from django.utils.translation import gettext_lazy as _ 

9 

10from . import Field 

11from .mixins import CheckFieldDefaultMixin 

12 

13__all__ = ["JSONField"] 

14 

15 

16class JSONField(CheckFieldDefaultMixin, Field): 

17 empty_strings_allowed = False 

18 description = _("A JSON object") 

19 default_error_messages = { 

20 "invalid": _("Value must be valid JSON."), 

21 } 

22 _default_hint = ("dict", "{}") 

23 

24 def __init__( 

25 self, 

26 verbose_name=None, 

27 name=None, 

28 encoder=None, 

29 decoder=None, 

30 **kwargs, 

31 ): 

32 if encoder and not callable(encoder): 32 ↛ 33line 32 didn't jump to line 33, because the condition on line 32 was never true

33 raise ValueError("The encoder parameter must be a callable object.") 

34 if decoder and not callable(decoder): 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true

35 raise ValueError("The decoder parameter must be a callable object.") 

36 self.encoder = encoder 

37 self.decoder = decoder 

38 super().__init__(verbose_name, name, **kwargs) 

39 

40 def check(self, **kwargs): 

41 errors = super().check(**kwargs) 

42 databases = kwargs.get("databases") or [] 

43 errors.extend(self._check_supported(databases)) 

44 return errors 

45 

46 def _check_supported(self, databases): 

47 errors = [] 

48 for db in databases: 

49 if not router.allow_migrate_model(db, self.model): 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true

50 continue 

51 connection = connections[db] 

52 if ( 52 ↛ 56line 52 didn't jump to line 56

53 self.model._meta.required_db_vendor 

54 and self.model._meta.required_db_vendor != connection.vendor 

55 ): 

56 continue 

57 if not ( 57 ↛ 61line 57 didn't jump to line 61, because the condition on line 57 was never true

58 "supports_json_field" in self.model._meta.required_db_features 

59 or connection.features.supports_json_field 

60 ): 

61 errors.append( 

62 checks.Error( 

63 "%s does not support JSONFields." % connection.display_name, 

64 obj=self.model, 

65 id="fields.E180", 

66 ) 

67 ) 

68 return errors 

69 

70 def deconstruct(self): 

71 name, path, args, kwargs = super().deconstruct() 

72 if self.encoder is not None: 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true

73 kwargs["encoder"] = self.encoder 

74 if self.decoder is not None: 74 ↛ 75line 74 didn't jump to line 75, because the condition on line 74 was never true

75 kwargs["decoder"] = self.decoder 

76 return name, path, args, kwargs 

77 

78 def from_db_value(self, value, expression, connection): 

79 if value is None: 

80 return value 

81 # Some backends (SQLite at least) extract non-string values in their 

82 # SQL datatypes. 

83 if isinstance(expression, KeyTransform) and not isinstance(value, str): 83 ↛ 84line 83 didn't jump to line 84, because the condition on line 83 was never true

84 return value 

85 try: 

86 return json.loads(value, cls=self.decoder) 

87 except json.JSONDecodeError: 

88 return value 

89 

90 def get_internal_type(self): 

91 return "JSONField" 

92 

93 def get_prep_value(self, value): 

94 if value is None: 

95 return value 

96 return json.dumps(value, cls=self.encoder) 

97 

98 def get_transform(self, name): 

99 transform = super().get_transform(name) 

100 if transform: 

101 return transform 

102 return KeyTransformFactory(name) 

103 

104 def validate(self, value, model_instance): 

105 super().validate(value, model_instance) 

106 try: 

107 json.dumps(value, cls=self.encoder) 

108 except TypeError: 

109 raise exceptions.ValidationError( 

110 self.error_messages["invalid"], 

111 code="invalid", 

112 params={"value": value}, 

113 ) 

114 

115 def value_to_string(self, obj): 

116 return self.value_from_object(obj) 

117 

118 def formfield(self, **kwargs): 

119 return super().formfield( 

120 **{ 

121 "form_class": forms.JSONField, 

122 "encoder": self.encoder, 

123 "decoder": self.decoder, 

124 **kwargs, 

125 } 

126 ) 

127 

128 

129def compile_json_path(key_transforms, include_root=True): 

130 path = ["$"] if include_root else [] 

131 for key_transform in key_transforms: 

132 try: 

133 num = int(key_transform) 

134 except ValueError: # non-integer 

135 path.append(".") 

136 path.append(json.dumps(key_transform)) 

137 else: 

138 path.append("[%s]" % num) 

139 return "".join(path) 

140 

141 

142class DataContains(PostgresOperatorLookup): 

143 lookup_name = "contains" 

144 postgres_operator = "@>" 

145 

146 def as_sql(self, compiler, connection): 

147 if not connection.features.supports_json_field_contains: 

148 raise NotSupportedError( 

149 "contains lookup is not supported on this database backend." 

150 ) 

151 lhs, lhs_params = self.process_lhs(compiler, connection) 

152 rhs, rhs_params = self.process_rhs(compiler, connection) 

153 params = tuple(lhs_params) + tuple(rhs_params) 

154 return "JSON_CONTAINS(%s, %s)" % (lhs, rhs), params 

155 

156 

157class ContainedBy(PostgresOperatorLookup): 

158 lookup_name = "contained_by" 

159 postgres_operator = "<@" 

160 

161 def as_sql(self, compiler, connection): 

162 if not connection.features.supports_json_field_contains: 

163 raise NotSupportedError( 

164 "contained_by lookup is not supported on this database backend." 

165 ) 

166 lhs, lhs_params = self.process_lhs(compiler, connection) 

167 rhs, rhs_params = self.process_rhs(compiler, connection) 

168 params = tuple(rhs_params) + tuple(lhs_params) 

169 return "JSON_CONTAINS(%s, %s)" % (rhs, lhs), params 

170 

171 

172class HasKeyLookup(PostgresOperatorLookup): 

173 logical_operator = None 

174 

175 def as_sql(self, compiler, connection, template=None): 

176 # Process JSON path from the left-hand side. 

177 if isinstance(self.lhs, KeyTransform): 

178 lhs, lhs_params, lhs_key_transforms = self.lhs.preprocess_lhs( 

179 compiler, connection 

180 ) 

181 lhs_json_path = compile_json_path(lhs_key_transforms) 

182 else: 

183 lhs, lhs_params = self.process_lhs(compiler, connection) 

184 lhs_json_path = "$" 

185 sql = template % lhs 

186 # Process JSON path from the right-hand side. 

187 rhs = self.rhs 

188 rhs_params = [] 

189 if not isinstance(rhs, (list, tuple)): 

190 rhs = [rhs] 

191 for key in rhs: 

192 if isinstance(key, KeyTransform): 

193 *_, rhs_key_transforms = key.preprocess_lhs(compiler, connection) 

194 else: 

195 rhs_key_transforms = [key] 

196 rhs_params.append( 

197 "%s%s" 

198 % ( 

199 lhs_json_path, 

200 compile_json_path(rhs_key_transforms, include_root=False), 

201 ) 

202 ) 

203 # Add condition for each key. 

204 if self.logical_operator: 

205 sql = "(%s)" % self.logical_operator.join([sql] * len(rhs_params)) 

206 return sql, tuple(lhs_params) + tuple(rhs_params) 

207 

208 def as_mysql(self, compiler, connection): 

209 return self.as_sql( 

210 compiler, connection, template="JSON_CONTAINS_PATH(%s, 'one', %%s)" 

211 ) 

212 

213 def as_oracle(self, compiler, connection): 

214 sql, params = self.as_sql( 

215 compiler, connection, template="JSON_EXISTS(%s, '%%s')" 

216 ) 

217 # Add paths directly into SQL because path expressions cannot be passed 

218 # as bind variables on Oracle. 

219 return sql % tuple(params), [] 

220 

221 def as_postgresql(self, compiler, connection): 

222 if isinstance(self.rhs, KeyTransform): 

223 *_, rhs_key_transforms = self.rhs.preprocess_lhs(compiler, connection) 

224 for key in rhs_key_transforms[:-1]: 

225 self.lhs = KeyTransform(key, self.lhs) 

226 self.rhs = rhs_key_transforms[-1] 

227 return super().as_postgresql(compiler, connection) 

228 

229 def as_sqlite(self, compiler, connection): 

230 return self.as_sql( 

231 compiler, connection, template="JSON_TYPE(%s, %%s) IS NOT NULL" 

232 ) 

233 

234 

235class HasKey(HasKeyLookup): 

236 lookup_name = "has_key" 

237 postgres_operator = "?" 

238 prepare_rhs = False 

239 

240 

241class HasKeys(HasKeyLookup): 

242 lookup_name = "has_keys" 

243 postgres_operator = "?&" 

244 logical_operator = " AND " 

245 

246 def get_prep_lookup(self): 

247 return [str(item) for item in self.rhs] 

248 

249 

250class HasAnyKeys(HasKeys): 

251 lookup_name = "has_any_keys" 

252 postgres_operator = "?|" 

253 logical_operator = " OR " 

254 

255 

256class CaseInsensitiveMixin: 

257 """ 

258 Mixin to allow case-insensitive comparison of JSON values on MySQL. 

259 MySQL handles strings used in JSON context using the utf8mb4_bin collation. 

260 Because utf8mb4_bin is a binary collation, comparison of JSON values is 

261 case-sensitive. 

262 """ 

263 

264 def process_lhs(self, compiler, connection): 

265 lhs, lhs_params = super().process_lhs(compiler, connection) 

266 if connection.vendor == "mysql": 

267 return "LOWER(%s)" % lhs, lhs_params 

268 return lhs, lhs_params 

269 

270 def process_rhs(self, compiler, connection): 

271 rhs, rhs_params = super().process_rhs(compiler, connection) 

272 if connection.vendor == "mysql": 

273 return "LOWER(%s)" % rhs, rhs_params 

274 return rhs, rhs_params 

275 

276 

277class JSONExact(lookups.Exact): 

278 can_use_none_as_rhs = True 

279 

280 def process_rhs(self, compiler, connection): 

281 rhs, rhs_params = super().process_rhs(compiler, connection) 

282 # Treat None lookup values as null. 

283 if rhs == "%s" and rhs_params == [None]: 

284 rhs_params = ["null"] 

285 if connection.vendor == "mysql": 

286 func = ["JSON_EXTRACT(%s, '$')"] * len(rhs_params) 

287 rhs = rhs % tuple(func) 

288 return rhs, rhs_params 

289 

290 

291class JSONIContains(CaseInsensitiveMixin, lookups.IContains): 

292 pass 

293 

294 

295JSONField.register_lookup(DataContains) 

296JSONField.register_lookup(ContainedBy) 

297JSONField.register_lookup(HasKey) 

298JSONField.register_lookup(HasKeys) 

299JSONField.register_lookup(HasAnyKeys) 

300JSONField.register_lookup(JSONExact) 

301JSONField.register_lookup(JSONIContains) 

302 

303 

304class KeyTransform(Transform): 

305 postgres_operator = "->" 

306 postgres_nested_operator = "#>" 

307 

308 def __init__(self, key_name, *args, **kwargs): 

309 super().__init__(*args, **kwargs) 

310 self.key_name = str(key_name) 

311 

312 def preprocess_lhs(self, compiler, connection): 

313 key_transforms = [self.key_name] 

314 previous = self.lhs 

315 while isinstance(previous, KeyTransform): 

316 key_transforms.insert(0, previous.key_name) 

317 previous = previous.lhs 

318 lhs, params = compiler.compile(previous) 

319 if connection.vendor == "oracle": 

320 # Escape string-formatting. 

321 key_transforms = [key.replace("%", "%%") for key in key_transforms] 

322 return lhs, params, key_transforms 

323 

324 def as_mysql(self, compiler, connection): 

325 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection) 

326 json_path = compile_json_path(key_transforms) 

327 return "JSON_EXTRACT(%s, %%s)" % lhs, tuple(params) + (json_path,) 

328 

329 def as_oracle(self, compiler, connection): 

330 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection) 

331 json_path = compile_json_path(key_transforms) 

332 return ( 

333 "COALESCE(JSON_QUERY(%s, '%s'), JSON_VALUE(%s, '%s'))" 

334 % ((lhs, json_path) * 2) 

335 ), tuple(params) * 2 

336 

337 def as_postgresql(self, compiler, connection): 

338 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection) 

339 if len(key_transforms) > 1: 

340 sql = "(%s %s %%s)" % (lhs, self.postgres_nested_operator) 

341 return sql, tuple(params) + (key_transforms,) 

342 try: 

343 lookup = int(self.key_name) 

344 except ValueError: 

345 lookup = self.key_name 

346 return "(%s %s %%s)" % (lhs, self.postgres_operator), tuple(params) + (lookup,) 

347 

348 def as_sqlite(self, compiler, connection): 

349 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection) 

350 json_path = compile_json_path(key_transforms) 

351 datatype_values = ",".join( 

352 [repr(datatype) for datatype in connection.ops.jsonfield_datatype_values] 

353 ) 

354 return ( 

355 "(CASE WHEN JSON_TYPE(%s, %%s) IN (%s) " 

356 "THEN JSON_TYPE(%s, %%s) ELSE JSON_EXTRACT(%s, %%s) END)" 

357 ) % (lhs, datatype_values, lhs, lhs), (tuple(params) + (json_path,)) * 3 

358 

359 

360class KeyTextTransform(KeyTransform): 

361 postgres_operator = "->>" 

362 postgres_nested_operator = "#>>" 

363 

364 

365class KeyTransformTextLookupMixin: 

366 """ 

367 Mixin for combining with a lookup expecting a text lhs from a JSONField 

368 key lookup. On PostgreSQL, make use of the ->> operator instead of casting 

369 key values to text and performing the lookup on the resulting 

370 representation. 

371 """ 

372 

373 def __init__(self, key_transform, *args, **kwargs): 

374 if not isinstance(key_transform, KeyTransform): 

375 raise TypeError( 

376 "Transform should be an instance of KeyTransform in order to " 

377 "use this lookup." 

378 ) 

379 key_text_transform = KeyTextTransform( 

380 key_transform.key_name, 

381 *key_transform.source_expressions, 

382 **key_transform.extra, 

383 ) 

384 super().__init__(key_text_transform, *args, **kwargs) 

385 

386 

387class KeyTransformIsNull(lookups.IsNull): 

388 # key__isnull=False is the same as has_key='key' 

389 def as_oracle(self, compiler, connection): 

390 sql, params = HasKey( 

391 self.lhs.lhs, 

392 self.lhs.key_name, 

393 ).as_oracle(compiler, connection) 

394 if not self.rhs: 

395 return sql, params 

396 # Column doesn't have a key or IS NULL. 

397 lhs, lhs_params, _ = self.lhs.preprocess_lhs(compiler, connection) 

398 return "(NOT %s OR %s IS NULL)" % (sql, lhs), tuple(params) + tuple(lhs_params) 

399 

400 def as_sqlite(self, compiler, connection): 

401 template = "JSON_TYPE(%s, %%s) IS NULL" 

402 if not self.rhs: 

403 template = "JSON_TYPE(%s, %%s) IS NOT NULL" 

404 return HasKey(self.lhs.lhs, self.lhs.key_name).as_sql( 

405 compiler, 

406 connection, 

407 template=template, 

408 ) 

409 

410 

411class KeyTransformIn(lookups.In): 

412 def resolve_expression_parameter(self, compiler, connection, sql, param): 

413 sql, params = super().resolve_expression_parameter( 

414 compiler, 

415 connection, 

416 sql, 

417 param, 

418 ) 

419 if ( 

420 not hasattr(param, "as_sql") 

421 and not connection.features.has_native_json_field 

422 ): 

423 if connection.vendor == "oracle": 

424 value = json.loads(param) 

425 sql = "%s(JSON_OBJECT('value' VALUE %%s FORMAT JSON), '$.value')" 

426 if isinstance(value, (list, dict)): 

427 sql = sql % "JSON_QUERY" 

428 else: 

429 sql = sql % "JSON_VALUE" 

430 elif connection.vendor == "mysql" or ( 

431 connection.vendor == "sqlite" 

432 and params[0] not in connection.ops.jsonfield_datatype_values 

433 ): 

434 sql = "JSON_EXTRACT(%s, '$')" 

435 if connection.vendor == "mysql" and connection.mysql_is_mariadb: 

436 sql = "JSON_UNQUOTE(%s)" % sql 

437 return sql, params 

438 

439 

440class KeyTransformExact(JSONExact): 

441 def process_rhs(self, compiler, connection): 

442 if isinstance(self.rhs, KeyTransform): 

443 return super(lookups.Exact, self).process_rhs(compiler, connection) 

444 rhs, rhs_params = super().process_rhs(compiler, connection) 

445 if connection.vendor == "oracle": 

446 func = [] 

447 sql = "%s(JSON_OBJECT('value' VALUE %%s FORMAT JSON), '$.value')" 

448 for value in rhs_params: 

449 value = json.loads(value) 

450 if isinstance(value, (list, dict)): 

451 func.append(sql % "JSON_QUERY") 

452 else: 

453 func.append(sql % "JSON_VALUE") 

454 rhs = rhs % tuple(func) 

455 elif connection.vendor == "sqlite": 

456 func = [] 

457 for value in rhs_params: 

458 if value in connection.ops.jsonfield_datatype_values: 

459 func.append("%s") 

460 else: 

461 func.append("JSON_EXTRACT(%s, '$')") 

462 rhs = rhs % tuple(func) 

463 return rhs, rhs_params 

464 

465 def as_oracle(self, compiler, connection): 

466 rhs, rhs_params = super().process_rhs(compiler, connection) 

467 if rhs_params == ["null"]: 

468 # Field has key and it's NULL. 

469 has_key_expr = HasKey(self.lhs.lhs, self.lhs.key_name) 

470 has_key_sql, has_key_params = has_key_expr.as_oracle(compiler, connection) 

471 is_null_expr = self.lhs.get_lookup("isnull")(self.lhs, True) 

472 is_null_sql, is_null_params = is_null_expr.as_sql(compiler, connection) 

473 return ( 

474 "%s AND %s" % (has_key_sql, is_null_sql), 

475 tuple(has_key_params) + tuple(is_null_params), 

476 ) 

477 return super().as_sql(compiler, connection) 

478 

479 

480class KeyTransformIExact( 

481 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IExact 

482): 

483 pass 

484 

485 

486class KeyTransformIContains( 

487 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IContains 

488): 

489 pass 

490 

491 

492class KeyTransformStartsWith(KeyTransformTextLookupMixin, lookups.StartsWith): 

493 pass 

494 

495 

496class KeyTransformIStartsWith( 

497 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IStartsWith 

498): 

499 pass 

500 

501 

502class KeyTransformEndsWith(KeyTransformTextLookupMixin, lookups.EndsWith): 

503 pass 

504 

505 

506class KeyTransformIEndsWith( 

507 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IEndsWith 

508): 

509 pass 

510 

511 

512class KeyTransformRegex(KeyTransformTextLookupMixin, lookups.Regex): 

513 pass 

514 

515 

516class KeyTransformIRegex( 

517 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IRegex 

518): 

519 pass 

520 

521 

522class KeyTransformNumericLookupMixin: 

523 def process_rhs(self, compiler, connection): 

524 rhs, rhs_params = super().process_rhs(compiler, connection) 

525 if not connection.features.has_native_json_field: 

526 rhs_params = [json.loads(value) for value in rhs_params] 

527 return rhs, rhs_params 

528 

529 

530class KeyTransformLt(KeyTransformNumericLookupMixin, lookups.LessThan): 

531 pass 

532 

533 

534class KeyTransformLte(KeyTransformNumericLookupMixin, lookups.LessThanOrEqual): 

535 pass 

536 

537 

538class KeyTransformGt(KeyTransformNumericLookupMixin, lookups.GreaterThan): 

539 pass 

540 

541 

542class KeyTransformGte(KeyTransformNumericLookupMixin, lookups.GreaterThanOrEqual): 

543 pass 

544 

545 

546KeyTransform.register_lookup(KeyTransformIn) 

547KeyTransform.register_lookup(KeyTransformExact) 

548KeyTransform.register_lookup(KeyTransformIExact) 

549KeyTransform.register_lookup(KeyTransformIsNull) 

550KeyTransform.register_lookup(KeyTransformIContains) 

551KeyTransform.register_lookup(KeyTransformStartsWith) 

552KeyTransform.register_lookup(KeyTransformIStartsWith) 

553KeyTransform.register_lookup(KeyTransformEndsWith) 

554KeyTransform.register_lookup(KeyTransformIEndsWith) 

555KeyTransform.register_lookup(KeyTransformRegex) 

556KeyTransform.register_lookup(KeyTransformIRegex) 

557 

558KeyTransform.register_lookup(KeyTransformLt) 

559KeyTransform.register_lookup(KeyTransformLte) 

560KeyTransform.register_lookup(KeyTransformGt) 

561KeyTransform.register_lookup(KeyTransformGte) 

562 

563 

564class KeyTransformFactory: 

565 def __init__(self, key_name): 

566 self.key_name = key_name 

567 

568 def __call__(self, *args, **kwargs): 

569 return KeyTransform(self.key_name, *args, **kwargs)