Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/functions/text.py: 70%
165 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from django.db import NotSupportedError
2from django.db.models.expressions import Func, Value
3from django.db.models.fields import CharField, IntegerField
4from django.db.models.functions import Coalesce
5from django.db.models.lookups import Transform
8class MySQLSHA2Mixin:
9 def as_mysql(self, compiler, connection, **extra_content):
10 return super().as_sql(
11 compiler,
12 connection,
13 template="SHA2(%%(expressions)s, %s)" % self.function[3:],
14 **extra_content,
15 )
18class OracleHashMixin:
19 def as_oracle(self, compiler, connection, **extra_context):
20 return super().as_sql(
21 compiler,
22 connection,
23 template=(
24 "LOWER(RAWTOHEX(STANDARD_HASH(UTL_I18N.STRING_TO_RAW("
25 "%(expressions)s, 'AL32UTF8'), '%(function)s')))"
26 ),
27 **extra_context,
28 )
31class PostgreSQLSHAMixin:
32 def as_postgresql(self, compiler, connection, **extra_content):
33 return super().as_sql(
34 compiler,
35 connection,
36 template="ENCODE(DIGEST(%(expressions)s, '%(function)s'), 'hex')",
37 function=self.function.lower(),
38 **extra_content,
39 )
42class Chr(Transform):
43 function = "CHR"
44 lookup_name = "chr"
46 def as_mysql(self, compiler, connection, **extra_context):
47 return super().as_sql(
48 compiler,
49 connection,
50 function="CHAR",
51 template="%(function)s(%(expressions)s USING utf16)",
52 **extra_context,
53 )
55 def as_oracle(self, compiler, connection, **extra_context):
56 return super().as_sql(
57 compiler,
58 connection,
59 template="%(function)s(%(expressions)s USING NCHAR_CS)",
60 **extra_context,
61 )
63 def as_sqlite(self, compiler, connection, **extra_context):
64 return super().as_sql(compiler, connection, function="CHAR", **extra_context)
67class ConcatPair(Func):
68 """
69 Concatenate two arguments together. This is used by `Concat` because not
70 all backend databases support more than two arguments.
71 """
73 function = "CONCAT"
75 def as_sqlite(self, compiler, connection, **extra_context):
76 coalesced = self.coalesce()
77 return super(ConcatPair, coalesced).as_sql(
78 compiler,
79 connection,
80 template="%(expressions)s",
81 arg_joiner=" || ",
82 **extra_context,
83 )
85 def as_mysql(self, compiler, connection, **extra_context):
86 # Use CONCAT_WS with an empty separator so that NULLs are ignored.
87 return super().as_sql(
88 compiler,
89 connection,
90 function="CONCAT_WS",
91 template="%(function)s('', %(expressions)s)",
92 **extra_context,
93 )
95 def coalesce(self):
96 # null on either side results in null for expression, wrap with coalesce
97 c = self.copy()
98 c.set_source_expressions(
99 [
100 Coalesce(expression, Value(""))
101 for expression in c.get_source_expressions()
102 ]
103 )
104 return c
107class Concat(Func):
108 """
109 Concatenate text fields together. Backends that result in an entire
110 null expression when any arguments are null will wrap each argument in
111 coalesce functions to ensure a non-null result.
112 """
114 function = None
115 template = "%(expressions)s"
117 def __init__(self, *expressions, **extra):
118 if len(expressions) < 2:
119 raise ValueError("Concat must take at least two expressions")
120 paired = self._paired(expressions)
121 super().__init__(paired, **extra)
123 def _paired(self, expressions):
124 # wrap pairs of expressions in successive concat functions
125 # exp = [a, b, c, d]
126 # -> ConcatPair(a, ConcatPair(b, ConcatPair(c, d))))
127 if len(expressions) == 2:
128 return ConcatPair(*expressions)
129 return ConcatPair(expressions[0], self._paired(expressions[1:]))
132class Left(Func):
133 function = "LEFT"
134 arity = 2
135 output_field = CharField()
137 def __init__(self, expression, length, **extra):
138 """
139 expression: the name of a field, or an expression returning a string
140 length: the number of characters to return from the start of the string
141 """
142 if not hasattr(length, "resolve_expression"):
143 if length < 1:
144 raise ValueError("'length' must be greater than 0.")
145 super().__init__(expression, length, **extra)
147 def get_substr(self):
148 return Substr(self.source_expressions[0], Value(1), self.source_expressions[1])
150 def as_oracle(self, compiler, connection, **extra_context):
151 return self.get_substr().as_oracle(compiler, connection, **extra_context)
153 def as_sqlite(self, compiler, connection, **extra_context):
154 return self.get_substr().as_sqlite(compiler, connection, **extra_context)
157class Length(Transform):
158 """Return the number of characters in the expression."""
160 function = "LENGTH"
161 lookup_name = "length"
162 output_field = IntegerField()
164 def as_mysql(self, compiler, connection, **extra_context):
165 return super().as_sql(
166 compiler, connection, function="CHAR_LENGTH", **extra_context
167 )
170class Lower(Transform):
171 function = "LOWER"
172 lookup_name = "lower"
175class LPad(Func):
176 function = "LPAD"
177 output_field = CharField()
179 def __init__(self, expression, length, fill_text=Value(" "), **extra):
180 if (
181 not hasattr(length, "resolve_expression")
182 and length is not None
183 and length < 0
184 ):
185 raise ValueError("'length' must be greater or equal to 0.")
186 super().__init__(expression, length, fill_text, **extra)
189class LTrim(Transform):
190 function = "LTRIM"
191 lookup_name = "ltrim"
194class MD5(OracleHashMixin, Transform):
195 function = "MD5"
196 lookup_name = "md5"
199class Ord(Transform):
200 function = "ASCII"
201 lookup_name = "ord"
202 output_field = IntegerField()
204 def as_mysql(self, compiler, connection, **extra_context):
205 return super().as_sql(compiler, connection, function="ORD", **extra_context)
207 def as_sqlite(self, compiler, connection, **extra_context):
208 return super().as_sql(compiler, connection, function="UNICODE", **extra_context)
211class Repeat(Func):
212 function = "REPEAT"
213 output_field = CharField()
215 def __init__(self, expression, number, **extra):
216 if (
217 not hasattr(number, "resolve_expression")
218 and number is not None
219 and number < 0
220 ):
221 raise ValueError("'number' must be greater or equal to 0.")
222 super().__init__(expression, number, **extra)
224 def as_oracle(self, compiler, connection, **extra_context):
225 expression, number = self.source_expressions
226 length = None if number is None else Length(expression) * number
227 rpad = RPad(expression, length, expression)
228 return rpad.as_sql(compiler, connection, **extra_context)
231class Replace(Func):
232 function = "REPLACE"
234 def __init__(self, expression, text, replacement=Value(""), **extra):
235 super().__init__(expression, text, replacement, **extra)
238class Reverse(Transform):
239 function = "REVERSE"
240 lookup_name = "reverse"
242 def as_oracle(self, compiler, connection, **extra_context):
243 # REVERSE in Oracle is undocumented and doesn't support multi-byte
244 # strings. Use a special subquery instead.
245 return super().as_sql(
246 compiler,
247 connection,
248 template=(
249 "(SELECT LISTAGG(s) WITHIN GROUP (ORDER BY n DESC) FROM "
250 "(SELECT LEVEL n, SUBSTR(%(expressions)s, LEVEL, 1) s "
251 "FROM DUAL CONNECT BY LEVEL <= LENGTH(%(expressions)s)) "
252 "GROUP BY %(expressions)s)"
253 ),
254 **extra_context,
255 )
258class Right(Left):
259 function = "RIGHT"
261 def get_substr(self):
262 return Substr(
263 self.source_expressions[0], self.source_expressions[1] * Value(-1)
264 )
267class RPad(LPad):
268 function = "RPAD"
271class RTrim(Transform):
272 function = "RTRIM"
273 lookup_name = "rtrim"
276class SHA1(OracleHashMixin, PostgreSQLSHAMixin, Transform):
277 function = "SHA1"
278 lookup_name = "sha1"
281class SHA224(MySQLSHA2Mixin, PostgreSQLSHAMixin, Transform):
282 function = "SHA224"
283 lookup_name = "sha224"
285 def as_oracle(self, compiler, connection, **extra_context):
286 raise NotSupportedError("SHA224 is not supported on Oracle.")
289class SHA256(MySQLSHA2Mixin, OracleHashMixin, PostgreSQLSHAMixin, Transform):
290 function = "SHA256"
291 lookup_name = "sha256"
294class SHA384(MySQLSHA2Mixin, OracleHashMixin, PostgreSQLSHAMixin, Transform):
295 function = "SHA384"
296 lookup_name = "sha384"
299class SHA512(MySQLSHA2Mixin, OracleHashMixin, PostgreSQLSHAMixin, Transform):
300 function = "SHA512"
301 lookup_name = "sha512"
304class StrIndex(Func):
305 """
306 Return a positive integer corresponding to the 1-indexed position of the
307 first occurrence of a substring inside another string, or 0 if the
308 substring is not found.
309 """
311 function = "INSTR"
312 arity = 2
313 output_field = IntegerField()
315 def as_postgresql(self, compiler, connection, **extra_context):
316 return super().as_sql(compiler, connection, function="STRPOS", **extra_context)
319class Substr(Func):
320 function = "SUBSTRING"
321 output_field = CharField()
323 def __init__(self, expression, pos, length=None, **extra):
324 """
325 expression: the name of a field, or an expression returning a string
326 pos: an integer > 0, or an expression returning an integer
327 length: an optional number of characters to return
328 """
329 if not hasattr(pos, "resolve_expression"):
330 if pos < 1:
331 raise ValueError("'pos' must be greater than 0")
332 expressions = [expression, pos]
333 if length is not None:
334 expressions.append(length)
335 super().__init__(*expressions, **extra)
337 def as_sqlite(self, compiler, connection, **extra_context):
338 return super().as_sql(compiler, connection, function="SUBSTR", **extra_context)
340 def as_oracle(self, compiler, connection, **extra_context):
341 return super().as_sql(compiler, connection, function="SUBSTR", **extra_context)
344class Trim(Transform):
345 function = "TRIM"
346 lookup_name = "trim"
349class Upper(Transform):
350 function = "UPPER"
351 lookup_name = "upper"