Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/functions/comparison.py: 35%

114 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Database functions that do comparisons or type conversions.""" 

2from django.db import NotSupportedError 

3from django.db.models.expressions import Func, Value 

4from django.db.models.fields.json import JSONField 

5from django.utils.regex_helper import _lazy_re_compile 

6 

7 

8class Cast(Func): 

9 """Coerce an expression to a new field type.""" 

10 

11 function = "CAST" 

12 template = "%(function)s(%(expressions)s AS %(db_type)s)" 

13 

14 def __init__(self, expression, output_field): 

15 super().__init__(expression, output_field=output_field) 

16 

17 def as_sql(self, compiler, connection, **extra_context): 

18 extra_context["db_type"] = self.output_field.cast_db_type(connection) 

19 return super().as_sql(compiler, connection, **extra_context) 

20 

21 def as_sqlite(self, compiler, connection, **extra_context): 

22 db_type = self.output_field.db_type(connection) 

23 if db_type in {"datetime", "time"}: 

24 # Use strftime as datetime/time don't keep fractional seconds. 

25 template = "strftime(%%s, %(expressions)s)" 

26 sql, params = super().as_sql( 

27 compiler, connection, template=template, **extra_context 

28 ) 

29 format_string = "%H:%M:%f" if db_type == "time" else "%Y-%m-%d %H:%M:%f" 

30 params.insert(0, format_string) 

31 return sql, params 

32 elif db_type == "date": 

33 template = "date(%(expressions)s)" 

34 return super().as_sql( 

35 compiler, connection, template=template, **extra_context 

36 ) 

37 return self.as_sql(compiler, connection, **extra_context) 

38 

39 def as_mysql(self, compiler, connection, **extra_context): 

40 template = None 

41 output_type = self.output_field.get_internal_type() 

42 # MySQL doesn't support explicit cast to float. 

43 if output_type == "FloatField": 

44 template = "(%(expressions)s + 0.0)" 

45 # MariaDB doesn't support explicit cast to JSON. 

46 elif output_type == "JSONField" and connection.mysql_is_mariadb: 

47 template = "JSON_EXTRACT(%(expressions)s, '$')" 

48 return self.as_sql(compiler, connection, template=template, **extra_context) 

49 

50 def as_postgresql(self, compiler, connection, **extra_context): 

51 # CAST would be valid too, but the :: shortcut syntax is more readable. 

52 # 'expressions' is wrapped in parentheses in case it's a complex 

53 # expression. 

54 return self.as_sql( 

55 compiler, 

56 connection, 

57 template="(%(expressions)s)::%(db_type)s", 

58 **extra_context, 

59 ) 

60 

61 def as_oracle(self, compiler, connection, **extra_context): 

62 if self.output_field.get_internal_type() == "JSONField": 

63 # Oracle doesn't support explicit cast to JSON. 

64 template = "JSON_QUERY(%(expressions)s, '$')" 

65 return super().as_sql( 

66 compiler, connection, template=template, **extra_context 

67 ) 

68 return self.as_sql(compiler, connection, **extra_context) 

69 

70 

71class Coalesce(Func): 

72 """Return, from left to right, the first non-null expression.""" 

73 

74 function = "COALESCE" 

75 

76 def __init__(self, *expressions, **extra): 

77 if len(expressions) < 2: 

78 raise ValueError("Coalesce must take at least two expressions") 

79 super().__init__(*expressions, **extra) 

80 

81 @property 

82 def empty_result_set_value(self): 

83 for expression in self.get_source_expressions(): 

84 result = expression.empty_result_set_value 

85 if result is NotImplemented or result is not None: 

86 return result 

87 return None 

88 

89 def as_oracle(self, compiler, connection, **extra_context): 

90 # Oracle prohibits mixing TextField (NCLOB) and CharField (NVARCHAR2), 

91 # so convert all fields to NCLOB when that type is expected. 

92 if self.output_field.get_internal_type() == "TextField": 

93 clone = self.copy() 

94 clone.set_source_expressions( 

95 [ 

96 Func(expression, function="TO_NCLOB") 

97 for expression in self.get_source_expressions() 

98 ] 

99 ) 

100 return super(Coalesce, clone).as_sql(compiler, connection, **extra_context) 

101 return self.as_sql(compiler, connection, **extra_context) 

102 

103 

104class Collate(Func): 

105 function = "COLLATE" 

106 template = "%(expressions)s %(function)s %(collation)s" 

107 # Inspired from 

108 # https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS 

109 collation_re = _lazy_re_compile(r"^[\w\-]+$") 

110 

111 def __init__(self, expression, collation): 

112 if not (collation and self.collation_re.match(collation)): 

113 raise ValueError("Invalid collation name: %r." % collation) 

114 self.collation = collation 

115 super().__init__(expression) 

116 

117 def as_sql(self, compiler, connection, **extra_context): 

118 extra_context.setdefault("collation", connection.ops.quote_name(self.collation)) 

119 return super().as_sql(compiler, connection, **extra_context) 

120 

121 

122class Greatest(Func): 

123 """ 

124 Return the maximum expression. 

125 

126 If any expression is null the return value is database-specific: 

127 On PostgreSQL, the maximum not-null expression is returned. 

128 On MySQL, Oracle, and SQLite, if any expression is null, null is returned. 

129 """ 

130 

131 function = "GREATEST" 

132 

133 def __init__(self, *expressions, **extra): 

134 if len(expressions) < 2: 

135 raise ValueError("Greatest must take at least two expressions") 

136 super().__init__(*expressions, **extra) 

137 

138 def as_sqlite(self, compiler, connection, **extra_context): 

139 """Use the MAX function on SQLite.""" 

140 return super().as_sqlite(compiler, connection, function="MAX", **extra_context) 

141 

142 

143class JSONObject(Func): 

144 function = "JSON_OBJECT" 

145 output_field = JSONField() 

146 

147 def __init__(self, **fields): 

148 expressions = [] 

149 for key, value in fields.items(): 

150 expressions.extend((Value(key), value)) 

151 super().__init__(*expressions) 

152 

153 def as_sql(self, compiler, connection, **extra_context): 

154 if not connection.features.has_json_object_function: 

155 raise NotSupportedError( 

156 "JSONObject() is not supported on this database backend." 

157 ) 

158 return super().as_sql(compiler, connection, **extra_context) 

159 

160 def as_postgresql(self, compiler, connection, **extra_context): 

161 return self.as_sql( 

162 compiler, 

163 connection, 

164 function="JSONB_BUILD_OBJECT", 

165 **extra_context, 

166 ) 

167 

168 def as_oracle(self, compiler, connection, **extra_context): 

169 class ArgJoiner: 

170 def join(self, args): 

171 args = [" VALUE ".join(arg) for arg in zip(args[::2], args[1::2])] 

172 return ", ".join(args) 

173 

174 return self.as_sql( 

175 compiler, 

176 connection, 

177 arg_joiner=ArgJoiner(), 

178 template="%(function)s(%(expressions)s RETURNING CLOB)", 

179 **extra_context, 

180 ) 

181 

182 

183class Least(Func): 

184 """ 

185 Return the minimum expression. 

186 

187 If any expression is null the return value is database-specific: 

188 On PostgreSQL, return the minimum not-null expression. 

189 On MySQL, Oracle, and SQLite, if any expression is null, return null. 

190 """ 

191 

192 function = "LEAST" 

193 

194 def __init__(self, *expressions, **extra): 

195 if len(expressions) < 2: 

196 raise ValueError("Least must take at least two expressions") 

197 super().__init__(*expressions, **extra) 

198 

199 def as_sqlite(self, compiler, connection, **extra_context): 

200 """Use the MIN function on SQLite.""" 

201 return super().as_sqlite(compiler, connection, function="MIN", **extra_context) 

202 

203 

204class NullIf(Func): 

205 function = "NULLIF" 

206 arity = 2 

207 

208 def as_oracle(self, compiler, connection, **extra_context): 

209 expression1 = self.get_source_expressions()[0] 

210 if isinstance(expression1, Value) and expression1.value is None: 

211 raise ValueError("Oracle does not allow Value(None) for expression1.") 

212 return super().as_sql(compiler, connection, **extra_context)