Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/fields/json.py: 48%
341 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import json
3from django import forms
4from django.core import checks, exceptions
5from django.db import NotSupportedError, connections, router
6from django.db.models import lookups
7from django.db.models.lookups import PostgresOperatorLookup, Transform
8from django.utils.translation import gettext_lazy as _
10from . import Field
11from .mixins import CheckFieldDefaultMixin
13__all__ = ["JSONField"]
16class JSONField(CheckFieldDefaultMixin, Field):
17 empty_strings_allowed = False
18 description = _("A JSON object")
19 default_error_messages = {
20 "invalid": _("Value must be valid JSON."),
21 }
22 _default_hint = ("dict", "{}")
24 def __init__(
25 self,
26 verbose_name=None,
27 name=None,
28 encoder=None,
29 decoder=None,
30 **kwargs,
31 ):
32 if encoder and not callable(encoder): 32 ↛ 33line 32 didn't jump to line 33, because the condition on line 32 was never true
33 raise ValueError("The encoder parameter must be a callable object.")
34 if decoder and not callable(decoder): 34 ↛ 35line 34 didn't jump to line 35, because the condition on line 34 was never true
35 raise ValueError("The decoder parameter must be a callable object.")
36 self.encoder = encoder
37 self.decoder = decoder
38 super().__init__(verbose_name, name, **kwargs)
40 def check(self, **kwargs):
41 errors = super().check(**kwargs)
42 databases = kwargs.get("databases") or []
43 errors.extend(self._check_supported(databases))
44 return errors
46 def _check_supported(self, databases):
47 errors = []
48 for db in databases:
49 if not router.allow_migrate_model(db, self.model): 49 ↛ 50line 49 didn't jump to line 50, because the condition on line 49 was never true
50 continue
51 connection = connections[db]
52 if ( 52 ↛ 56line 52 didn't jump to line 56
53 self.model._meta.required_db_vendor
54 and self.model._meta.required_db_vendor != connection.vendor
55 ):
56 continue
57 if not ( 57 ↛ 61line 57 didn't jump to line 61, because the condition on line 57 was never true
58 "supports_json_field" in self.model._meta.required_db_features
59 or connection.features.supports_json_field
60 ):
61 errors.append(
62 checks.Error(
63 "%s does not support JSONFields." % connection.display_name,
64 obj=self.model,
65 id="fields.E180",
66 )
67 )
68 return errors
70 def deconstruct(self):
71 name, path, args, kwargs = super().deconstruct()
72 if self.encoder is not None: 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true
73 kwargs["encoder"] = self.encoder
74 if self.decoder is not None: 74 ↛ 75line 74 didn't jump to line 75, because the condition on line 74 was never true
75 kwargs["decoder"] = self.decoder
76 return name, path, args, kwargs
78 def from_db_value(self, value, expression, connection):
79 if value is None:
80 return value
81 # Some backends (SQLite at least) extract non-string values in their
82 # SQL datatypes.
83 if isinstance(expression, KeyTransform) and not isinstance(value, str): 83 ↛ 84line 83 didn't jump to line 84, because the condition on line 83 was never true
84 return value
85 try:
86 return json.loads(value, cls=self.decoder)
87 except json.JSONDecodeError:
88 return value
90 def get_internal_type(self):
91 return "JSONField"
93 def get_prep_value(self, value):
94 if value is None:
95 return value
96 return json.dumps(value, cls=self.encoder)
98 def get_transform(self, name):
99 transform = super().get_transform(name)
100 if transform:
101 return transform
102 return KeyTransformFactory(name)
104 def validate(self, value, model_instance):
105 super().validate(value, model_instance)
106 try:
107 json.dumps(value, cls=self.encoder)
108 except TypeError:
109 raise exceptions.ValidationError(
110 self.error_messages["invalid"],
111 code="invalid",
112 params={"value": value},
113 )
115 def value_to_string(self, obj):
116 return self.value_from_object(obj)
118 def formfield(self, **kwargs):
119 return super().formfield(
120 **{
121 "form_class": forms.JSONField,
122 "encoder": self.encoder,
123 "decoder": self.decoder,
124 **kwargs,
125 }
126 )
129def compile_json_path(key_transforms, include_root=True):
130 path = ["$"] if include_root else []
131 for key_transform in key_transforms:
132 try:
133 num = int(key_transform)
134 except ValueError: # non-integer
135 path.append(".")
136 path.append(json.dumps(key_transform))
137 else:
138 path.append("[%s]" % num)
139 return "".join(path)
142class DataContains(PostgresOperatorLookup):
143 lookup_name = "contains"
144 postgres_operator = "@>"
146 def as_sql(self, compiler, connection):
147 if not connection.features.supports_json_field_contains:
148 raise NotSupportedError(
149 "contains lookup is not supported on this database backend."
150 )
151 lhs, lhs_params = self.process_lhs(compiler, connection)
152 rhs, rhs_params = self.process_rhs(compiler, connection)
153 params = tuple(lhs_params) + tuple(rhs_params)
154 return "JSON_CONTAINS(%s, %s)" % (lhs, rhs), params
157class ContainedBy(PostgresOperatorLookup):
158 lookup_name = "contained_by"
159 postgres_operator = "<@"
161 def as_sql(self, compiler, connection):
162 if not connection.features.supports_json_field_contains:
163 raise NotSupportedError(
164 "contained_by lookup is not supported on this database backend."
165 )
166 lhs, lhs_params = self.process_lhs(compiler, connection)
167 rhs, rhs_params = self.process_rhs(compiler, connection)
168 params = tuple(rhs_params) + tuple(lhs_params)
169 return "JSON_CONTAINS(%s, %s)" % (rhs, lhs), params
172class HasKeyLookup(PostgresOperatorLookup):
173 logical_operator = None
175 def as_sql(self, compiler, connection, template=None):
176 # Process JSON path from the left-hand side.
177 if isinstance(self.lhs, KeyTransform):
178 lhs, lhs_params, lhs_key_transforms = self.lhs.preprocess_lhs(
179 compiler, connection
180 )
181 lhs_json_path = compile_json_path(lhs_key_transforms)
182 else:
183 lhs, lhs_params = self.process_lhs(compiler, connection)
184 lhs_json_path = "$"
185 sql = template % lhs
186 # Process JSON path from the right-hand side.
187 rhs = self.rhs
188 rhs_params = []
189 if not isinstance(rhs, (list, tuple)):
190 rhs = [rhs]
191 for key in rhs:
192 if isinstance(key, KeyTransform):
193 *_, rhs_key_transforms = key.preprocess_lhs(compiler, connection)
194 else:
195 rhs_key_transforms = [key]
196 rhs_params.append(
197 "%s%s"
198 % (
199 lhs_json_path,
200 compile_json_path(rhs_key_transforms, include_root=False),
201 )
202 )
203 # Add condition for each key.
204 if self.logical_operator:
205 sql = "(%s)" % self.logical_operator.join([sql] * len(rhs_params))
206 return sql, tuple(lhs_params) + tuple(rhs_params)
208 def as_mysql(self, compiler, connection):
209 return self.as_sql(
210 compiler, connection, template="JSON_CONTAINS_PATH(%s, 'one', %%s)"
211 )
213 def as_oracle(self, compiler, connection):
214 sql, params = self.as_sql(
215 compiler, connection, template="JSON_EXISTS(%s, '%%s')"
216 )
217 # Add paths directly into SQL because path expressions cannot be passed
218 # as bind variables on Oracle.
219 return sql % tuple(params), []
221 def as_postgresql(self, compiler, connection):
222 if isinstance(self.rhs, KeyTransform):
223 *_, rhs_key_transforms = self.rhs.preprocess_lhs(compiler, connection)
224 for key in rhs_key_transforms[:-1]:
225 self.lhs = KeyTransform(key, self.lhs)
226 self.rhs = rhs_key_transforms[-1]
227 return super().as_postgresql(compiler, connection)
229 def as_sqlite(self, compiler, connection):
230 return self.as_sql(
231 compiler, connection, template="JSON_TYPE(%s, %%s) IS NOT NULL"
232 )
235class HasKey(HasKeyLookup):
236 lookup_name = "has_key"
237 postgres_operator = "?"
238 prepare_rhs = False
241class HasKeys(HasKeyLookup):
242 lookup_name = "has_keys"
243 postgres_operator = "?&"
244 logical_operator = " AND "
246 def get_prep_lookup(self):
247 return [str(item) for item in self.rhs]
250class HasAnyKeys(HasKeys):
251 lookup_name = "has_any_keys"
252 postgres_operator = "?|"
253 logical_operator = " OR "
256class CaseInsensitiveMixin:
257 """
258 Mixin to allow case-insensitive comparison of JSON values on MySQL.
259 MySQL handles strings used in JSON context using the utf8mb4_bin collation.
260 Because utf8mb4_bin is a binary collation, comparison of JSON values is
261 case-sensitive.
262 """
264 def process_lhs(self, compiler, connection):
265 lhs, lhs_params = super().process_lhs(compiler, connection)
266 if connection.vendor == "mysql":
267 return "LOWER(%s)" % lhs, lhs_params
268 return lhs, lhs_params
270 def process_rhs(self, compiler, connection):
271 rhs, rhs_params = super().process_rhs(compiler, connection)
272 if connection.vendor == "mysql":
273 return "LOWER(%s)" % rhs, rhs_params
274 return rhs, rhs_params
277class JSONExact(lookups.Exact):
278 can_use_none_as_rhs = True
280 def process_rhs(self, compiler, connection):
281 rhs, rhs_params = super().process_rhs(compiler, connection)
282 # Treat None lookup values as null.
283 if rhs == "%s" and rhs_params == [None]:
284 rhs_params = ["null"]
285 if connection.vendor == "mysql":
286 func = ["JSON_EXTRACT(%s, '$')"] * len(rhs_params)
287 rhs = rhs % tuple(func)
288 return rhs, rhs_params
291class JSONIContains(CaseInsensitiveMixin, lookups.IContains):
292 pass
295JSONField.register_lookup(DataContains)
296JSONField.register_lookup(ContainedBy)
297JSONField.register_lookup(HasKey)
298JSONField.register_lookup(HasKeys)
299JSONField.register_lookup(HasAnyKeys)
300JSONField.register_lookup(JSONExact)
301JSONField.register_lookup(JSONIContains)
304class KeyTransform(Transform):
305 postgres_operator = "->"
306 postgres_nested_operator = "#>"
308 def __init__(self, key_name, *args, **kwargs):
309 super().__init__(*args, **kwargs)
310 self.key_name = str(key_name)
312 def preprocess_lhs(self, compiler, connection):
313 key_transforms = [self.key_name]
314 previous = self.lhs
315 while isinstance(previous, KeyTransform):
316 key_transforms.insert(0, previous.key_name)
317 previous = previous.lhs
318 lhs, params = compiler.compile(previous)
319 if connection.vendor == "oracle":
320 # Escape string-formatting.
321 key_transforms = [key.replace("%", "%%") for key in key_transforms]
322 return lhs, params, key_transforms
324 def as_mysql(self, compiler, connection):
325 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection)
326 json_path = compile_json_path(key_transforms)
327 return "JSON_EXTRACT(%s, %%s)" % lhs, tuple(params) + (json_path,)
329 def as_oracle(self, compiler, connection):
330 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection)
331 json_path = compile_json_path(key_transforms)
332 return (
333 "COALESCE(JSON_QUERY(%s, '%s'), JSON_VALUE(%s, '%s'))"
334 % ((lhs, json_path) * 2)
335 ), tuple(params) * 2
337 def as_postgresql(self, compiler, connection):
338 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection)
339 if len(key_transforms) > 1:
340 sql = "(%s %s %%s)" % (lhs, self.postgres_nested_operator)
341 return sql, tuple(params) + (key_transforms,)
342 try:
343 lookup = int(self.key_name)
344 except ValueError:
345 lookup = self.key_name
346 return "(%s %s %%s)" % (lhs, self.postgres_operator), tuple(params) + (lookup,)
348 def as_sqlite(self, compiler, connection):
349 lhs, params, key_transforms = self.preprocess_lhs(compiler, connection)
350 json_path = compile_json_path(key_transforms)
351 datatype_values = ",".join(
352 [repr(datatype) for datatype in connection.ops.jsonfield_datatype_values]
353 )
354 return (
355 "(CASE WHEN JSON_TYPE(%s, %%s) IN (%s) "
356 "THEN JSON_TYPE(%s, %%s) ELSE JSON_EXTRACT(%s, %%s) END)"
357 ) % (lhs, datatype_values, lhs, lhs), (tuple(params) + (json_path,)) * 3
360class KeyTextTransform(KeyTransform):
361 postgres_operator = "->>"
362 postgres_nested_operator = "#>>"
365class KeyTransformTextLookupMixin:
366 """
367 Mixin for combining with a lookup expecting a text lhs from a JSONField
368 key lookup. On PostgreSQL, make use of the ->> operator instead of casting
369 key values to text and performing the lookup on the resulting
370 representation.
371 """
373 def __init__(self, key_transform, *args, **kwargs):
374 if not isinstance(key_transform, KeyTransform):
375 raise TypeError(
376 "Transform should be an instance of KeyTransform in order to "
377 "use this lookup."
378 )
379 key_text_transform = KeyTextTransform(
380 key_transform.key_name,
381 *key_transform.source_expressions,
382 **key_transform.extra,
383 )
384 super().__init__(key_text_transform, *args, **kwargs)
387class KeyTransformIsNull(lookups.IsNull):
388 # key__isnull=False is the same as has_key='key'
389 def as_oracle(self, compiler, connection):
390 sql, params = HasKey(
391 self.lhs.lhs,
392 self.lhs.key_name,
393 ).as_oracle(compiler, connection)
394 if not self.rhs:
395 return sql, params
396 # Column doesn't have a key or IS NULL.
397 lhs, lhs_params, _ = self.lhs.preprocess_lhs(compiler, connection)
398 return "(NOT %s OR %s IS NULL)" % (sql, lhs), tuple(params) + tuple(lhs_params)
400 def as_sqlite(self, compiler, connection):
401 template = "JSON_TYPE(%s, %%s) IS NULL"
402 if not self.rhs:
403 template = "JSON_TYPE(%s, %%s) IS NOT NULL"
404 return HasKey(self.lhs.lhs, self.lhs.key_name).as_sql(
405 compiler,
406 connection,
407 template=template,
408 )
411class KeyTransformIn(lookups.In):
412 def resolve_expression_parameter(self, compiler, connection, sql, param):
413 sql, params = super().resolve_expression_parameter(
414 compiler,
415 connection,
416 sql,
417 param,
418 )
419 if (
420 not hasattr(param, "as_sql")
421 and not connection.features.has_native_json_field
422 ):
423 if connection.vendor == "oracle":
424 value = json.loads(param)
425 sql = "%s(JSON_OBJECT('value' VALUE %%s FORMAT JSON), '$.value')"
426 if isinstance(value, (list, dict)):
427 sql = sql % "JSON_QUERY"
428 else:
429 sql = sql % "JSON_VALUE"
430 elif connection.vendor == "mysql" or (
431 connection.vendor == "sqlite"
432 and params[0] not in connection.ops.jsonfield_datatype_values
433 ):
434 sql = "JSON_EXTRACT(%s, '$')"
435 if connection.vendor == "mysql" and connection.mysql_is_mariadb:
436 sql = "JSON_UNQUOTE(%s)" % sql
437 return sql, params
440class KeyTransformExact(JSONExact):
441 def process_rhs(self, compiler, connection):
442 if isinstance(self.rhs, KeyTransform):
443 return super(lookups.Exact, self).process_rhs(compiler, connection)
444 rhs, rhs_params = super().process_rhs(compiler, connection)
445 if connection.vendor == "oracle":
446 func = []
447 sql = "%s(JSON_OBJECT('value' VALUE %%s FORMAT JSON), '$.value')"
448 for value in rhs_params:
449 value = json.loads(value)
450 if isinstance(value, (list, dict)):
451 func.append(sql % "JSON_QUERY")
452 else:
453 func.append(sql % "JSON_VALUE")
454 rhs = rhs % tuple(func)
455 elif connection.vendor == "sqlite":
456 func = []
457 for value in rhs_params:
458 if value in connection.ops.jsonfield_datatype_values:
459 func.append("%s")
460 else:
461 func.append("JSON_EXTRACT(%s, '$')")
462 rhs = rhs % tuple(func)
463 return rhs, rhs_params
465 def as_oracle(self, compiler, connection):
466 rhs, rhs_params = super().process_rhs(compiler, connection)
467 if rhs_params == ["null"]:
468 # Field has key and it's NULL.
469 has_key_expr = HasKey(self.lhs.lhs, self.lhs.key_name)
470 has_key_sql, has_key_params = has_key_expr.as_oracle(compiler, connection)
471 is_null_expr = self.lhs.get_lookup("isnull")(self.lhs, True)
472 is_null_sql, is_null_params = is_null_expr.as_sql(compiler, connection)
473 return (
474 "%s AND %s" % (has_key_sql, is_null_sql),
475 tuple(has_key_params) + tuple(is_null_params),
476 )
477 return super().as_sql(compiler, connection)
480class KeyTransformIExact(
481 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IExact
482):
483 pass
486class KeyTransformIContains(
487 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IContains
488):
489 pass
492class KeyTransformStartsWith(KeyTransformTextLookupMixin, lookups.StartsWith):
493 pass
496class KeyTransformIStartsWith(
497 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IStartsWith
498):
499 pass
502class KeyTransformEndsWith(KeyTransformTextLookupMixin, lookups.EndsWith):
503 pass
506class KeyTransformIEndsWith(
507 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IEndsWith
508):
509 pass
512class KeyTransformRegex(KeyTransformTextLookupMixin, lookups.Regex):
513 pass
516class KeyTransformIRegex(
517 CaseInsensitiveMixin, KeyTransformTextLookupMixin, lookups.IRegex
518):
519 pass
522class KeyTransformNumericLookupMixin:
523 def process_rhs(self, compiler, connection):
524 rhs, rhs_params = super().process_rhs(compiler, connection)
525 if not connection.features.has_native_json_field:
526 rhs_params = [json.loads(value) for value in rhs_params]
527 return rhs, rhs_params
530class KeyTransformLt(KeyTransformNumericLookupMixin, lookups.LessThan):
531 pass
534class KeyTransformLte(KeyTransformNumericLookupMixin, lookups.LessThanOrEqual):
535 pass
538class KeyTransformGt(KeyTransformNumericLookupMixin, lookups.GreaterThan):
539 pass
542class KeyTransformGte(KeyTransformNumericLookupMixin, lookups.GreaterThanOrEqual):
543 pass
546KeyTransform.register_lookup(KeyTransformIn)
547KeyTransform.register_lookup(KeyTransformExact)
548KeyTransform.register_lookup(KeyTransformIExact)
549KeyTransform.register_lookup(KeyTransformIsNull)
550KeyTransform.register_lookup(KeyTransformIContains)
551KeyTransform.register_lookup(KeyTransformStartsWith)
552KeyTransform.register_lookup(KeyTransformIStartsWith)
553KeyTransform.register_lookup(KeyTransformEndsWith)
554KeyTransform.register_lookup(KeyTransformIEndsWith)
555KeyTransform.register_lookup(KeyTransformRegex)
556KeyTransform.register_lookup(KeyTransformIRegex)
558KeyTransform.register_lookup(KeyTransformLt)
559KeyTransform.register_lookup(KeyTransformLte)
560KeyTransform.register_lookup(KeyTransformGt)
561KeyTransform.register_lookup(KeyTransformGte)
564class KeyTransformFactory:
565 def __init__(self, key_name):
566 self.key_name = key_name
568 def __call__(self, *args, **kwargs):
569 return KeyTransform(self.key_name, *args, **kwargs)