Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/functions/text.py: 70%

165 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from django.db import NotSupportedError 

2from django.db.models.expressions import Func, Value 

3from django.db.models.fields import CharField, IntegerField 

4from django.db.models.functions import Coalesce 

5from django.db.models.lookups import Transform 

6 

7 

8class MySQLSHA2Mixin: 

9 def as_mysql(self, compiler, connection, **extra_content): 

10 return super().as_sql( 

11 compiler, 

12 connection, 

13 template="SHA2(%%(expressions)s, %s)" % self.function[3:], 

14 **extra_content, 

15 ) 

16 

17 

18class OracleHashMixin: 

19 def as_oracle(self, compiler, connection, **extra_context): 

20 return super().as_sql( 

21 compiler, 

22 connection, 

23 template=( 

24 "LOWER(RAWTOHEX(STANDARD_HASH(UTL_I18N.STRING_TO_RAW(" 

25 "%(expressions)s, 'AL32UTF8'), '%(function)s')))" 

26 ), 

27 **extra_context, 

28 ) 

29 

30 

31class PostgreSQLSHAMixin: 

32 def as_postgresql(self, compiler, connection, **extra_content): 

33 return super().as_sql( 

34 compiler, 

35 connection, 

36 template="ENCODE(DIGEST(%(expressions)s, '%(function)s'), 'hex')", 

37 function=self.function.lower(), 

38 **extra_content, 

39 ) 

40 

41 

42class Chr(Transform): 

43 function = "CHR" 

44 lookup_name = "chr" 

45 

46 def as_mysql(self, compiler, connection, **extra_context): 

47 return super().as_sql( 

48 compiler, 

49 connection, 

50 function="CHAR", 

51 template="%(function)s(%(expressions)s USING utf16)", 

52 **extra_context, 

53 ) 

54 

55 def as_oracle(self, compiler, connection, **extra_context): 

56 return super().as_sql( 

57 compiler, 

58 connection, 

59 template="%(function)s(%(expressions)s USING NCHAR_CS)", 

60 **extra_context, 

61 ) 

62 

63 def as_sqlite(self, compiler, connection, **extra_context): 

64 return super().as_sql(compiler, connection, function="CHAR", **extra_context) 

65 

66 

67class ConcatPair(Func): 

68 """ 

69 Concatenate two arguments together. This is used by `Concat` because not 

70 all backend databases support more than two arguments. 

71 """ 

72 

73 function = "CONCAT" 

74 

75 def as_sqlite(self, compiler, connection, **extra_context): 

76 coalesced = self.coalesce() 

77 return super(ConcatPair, coalesced).as_sql( 

78 compiler, 

79 connection, 

80 template="%(expressions)s", 

81 arg_joiner=" || ", 

82 **extra_context, 

83 ) 

84 

85 def as_mysql(self, compiler, connection, **extra_context): 

86 # Use CONCAT_WS with an empty separator so that NULLs are ignored. 

87 return super().as_sql( 

88 compiler, 

89 connection, 

90 function="CONCAT_WS", 

91 template="%(function)s('', %(expressions)s)", 

92 **extra_context, 

93 ) 

94 

95 def coalesce(self): 

96 # null on either side results in null for expression, wrap with coalesce 

97 c = self.copy() 

98 c.set_source_expressions( 

99 [ 

100 Coalesce(expression, Value("")) 

101 for expression in c.get_source_expressions() 

102 ] 

103 ) 

104 return c 

105 

106 

107class Concat(Func): 

108 """ 

109 Concatenate text fields together. Backends that result in an entire 

110 null expression when any arguments are null will wrap each argument in 

111 coalesce functions to ensure a non-null result. 

112 """ 

113 

114 function = None 

115 template = "%(expressions)s" 

116 

117 def __init__(self, *expressions, **extra): 

118 if len(expressions) < 2: 

119 raise ValueError("Concat must take at least two expressions") 

120 paired = self._paired(expressions) 

121 super().__init__(paired, **extra) 

122 

123 def _paired(self, expressions): 

124 # wrap pairs of expressions in successive concat functions 

125 # exp = [a, b, c, d] 

126 # -> ConcatPair(a, ConcatPair(b, ConcatPair(c, d)))) 

127 if len(expressions) == 2: 

128 return ConcatPair(*expressions) 

129 return ConcatPair(expressions[0], self._paired(expressions[1:])) 

130 

131 

132class Left(Func): 

133 function = "LEFT" 

134 arity = 2 

135 output_field = CharField() 

136 

137 def __init__(self, expression, length, **extra): 

138 """ 

139 expression: the name of a field, or an expression returning a string 

140 length: the number of characters to return from the start of the string 

141 """ 

142 if not hasattr(length, "resolve_expression"): 

143 if length < 1: 

144 raise ValueError("'length' must be greater than 0.") 

145 super().__init__(expression, length, **extra) 

146 

147 def get_substr(self): 

148 return Substr(self.source_expressions[0], Value(1), self.source_expressions[1]) 

149 

150 def as_oracle(self, compiler, connection, **extra_context): 

151 return self.get_substr().as_oracle(compiler, connection, **extra_context) 

152 

153 def as_sqlite(self, compiler, connection, **extra_context): 

154 return self.get_substr().as_sqlite(compiler, connection, **extra_context) 

155 

156 

157class Length(Transform): 

158 """Return the number of characters in the expression.""" 

159 

160 function = "LENGTH" 

161 lookup_name = "length" 

162 output_field = IntegerField() 

163 

164 def as_mysql(self, compiler, connection, **extra_context): 

165 return super().as_sql( 

166 compiler, connection, function="CHAR_LENGTH", **extra_context 

167 ) 

168 

169 

170class Lower(Transform): 

171 function = "LOWER" 

172 lookup_name = "lower" 

173 

174 

175class LPad(Func): 

176 function = "LPAD" 

177 output_field = CharField() 

178 

179 def __init__(self, expression, length, fill_text=Value(" "), **extra): 

180 if ( 

181 not hasattr(length, "resolve_expression") 

182 and length is not None 

183 and length < 0 

184 ): 

185 raise ValueError("'length' must be greater or equal to 0.") 

186 super().__init__(expression, length, fill_text, **extra) 

187 

188 

189class LTrim(Transform): 

190 function = "LTRIM" 

191 lookup_name = "ltrim" 

192 

193 

194class MD5(OracleHashMixin, Transform): 

195 function = "MD5" 

196 lookup_name = "md5" 

197 

198 

199class Ord(Transform): 

200 function = "ASCII" 

201 lookup_name = "ord" 

202 output_field = IntegerField() 

203 

204 def as_mysql(self, compiler, connection, **extra_context): 

205 return super().as_sql(compiler, connection, function="ORD", **extra_context) 

206 

207 def as_sqlite(self, compiler, connection, **extra_context): 

208 return super().as_sql(compiler, connection, function="UNICODE", **extra_context) 

209 

210 

211class Repeat(Func): 

212 function = "REPEAT" 

213 output_field = CharField() 

214 

215 def __init__(self, expression, number, **extra): 

216 if ( 

217 not hasattr(number, "resolve_expression") 

218 and number is not None 

219 and number < 0 

220 ): 

221 raise ValueError("'number' must be greater or equal to 0.") 

222 super().__init__(expression, number, **extra) 

223 

224 def as_oracle(self, compiler, connection, **extra_context): 

225 expression, number = self.source_expressions 

226 length = None if number is None else Length(expression) * number 

227 rpad = RPad(expression, length, expression) 

228 return rpad.as_sql(compiler, connection, **extra_context) 

229 

230 

231class Replace(Func): 

232 function = "REPLACE" 

233 

234 def __init__(self, expression, text, replacement=Value(""), **extra): 

235 super().__init__(expression, text, replacement, **extra) 

236 

237 

238class Reverse(Transform): 

239 function = "REVERSE" 

240 lookup_name = "reverse" 

241 

242 def as_oracle(self, compiler, connection, **extra_context): 

243 # REVERSE in Oracle is undocumented and doesn't support multi-byte 

244 # strings. Use a special subquery instead. 

245 return super().as_sql( 

246 compiler, 

247 connection, 

248 template=( 

249 "(SELECT LISTAGG(s) WITHIN GROUP (ORDER BY n DESC) FROM " 

250 "(SELECT LEVEL n, SUBSTR(%(expressions)s, LEVEL, 1) s " 

251 "FROM DUAL CONNECT BY LEVEL <= LENGTH(%(expressions)s)) " 

252 "GROUP BY %(expressions)s)" 

253 ), 

254 **extra_context, 

255 ) 

256 

257 

258class Right(Left): 

259 function = "RIGHT" 

260 

261 def get_substr(self): 

262 return Substr( 

263 self.source_expressions[0], self.source_expressions[1] * Value(-1) 

264 ) 

265 

266 

267class RPad(LPad): 

268 function = "RPAD" 

269 

270 

271class RTrim(Transform): 

272 function = "RTRIM" 

273 lookup_name = "rtrim" 

274 

275 

276class SHA1(OracleHashMixin, PostgreSQLSHAMixin, Transform): 

277 function = "SHA1" 

278 lookup_name = "sha1" 

279 

280 

281class SHA224(MySQLSHA2Mixin, PostgreSQLSHAMixin, Transform): 

282 function = "SHA224" 

283 lookup_name = "sha224" 

284 

285 def as_oracle(self, compiler, connection, **extra_context): 

286 raise NotSupportedError("SHA224 is not supported on Oracle.") 

287 

288 

289class SHA256(MySQLSHA2Mixin, OracleHashMixin, PostgreSQLSHAMixin, Transform): 

290 function = "SHA256" 

291 lookup_name = "sha256" 

292 

293 

294class SHA384(MySQLSHA2Mixin, OracleHashMixin, PostgreSQLSHAMixin, Transform): 

295 function = "SHA384" 

296 lookup_name = "sha384" 

297 

298 

299class SHA512(MySQLSHA2Mixin, OracleHashMixin, PostgreSQLSHAMixin, Transform): 

300 function = "SHA512" 

301 lookup_name = "sha512" 

302 

303 

304class StrIndex(Func): 

305 """ 

306 Return a positive integer corresponding to the 1-indexed position of the 

307 first occurrence of a substring inside another string, or 0 if the 

308 substring is not found. 

309 """ 

310 

311 function = "INSTR" 

312 arity = 2 

313 output_field = IntegerField() 

314 

315 def as_postgresql(self, compiler, connection, **extra_context): 

316 return super().as_sql(compiler, connection, function="STRPOS", **extra_context) 

317 

318 

319class Substr(Func): 

320 function = "SUBSTRING" 

321 output_field = CharField() 

322 

323 def __init__(self, expression, pos, length=None, **extra): 

324 """ 

325 expression: the name of a field, or an expression returning a string 

326 pos: an integer > 0, or an expression returning an integer 

327 length: an optional number of characters to return 

328 """ 

329 if not hasattr(pos, "resolve_expression"): 

330 if pos < 1: 

331 raise ValueError("'pos' must be greater than 0") 

332 expressions = [expression, pos] 

333 if length is not None: 

334 expressions.append(length) 

335 super().__init__(*expressions, **extra) 

336 

337 def as_sqlite(self, compiler, connection, **extra_context): 

338 return super().as_sql(compiler, connection, function="SUBSTR", **extra_context) 

339 

340 def as_oracle(self, compiler, connection, **extra_context): 

341 return super().as_sql(compiler, connection, function="SUBSTR", **extra_context) 

342 

343 

344class Trim(Transform): 

345 function = "TRIM" 

346 lookup_name = "trim" 

347 

348 

349class Upper(Transform): 

350 function = "UPPER" 

351 lookup_name = "upper"