Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/operations.py: 36%

162 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from psycopg2.extras import Inet 

2 

3from django.conf import settings 

4from django.db.backends.base.operations import BaseDatabaseOperations 

5from django.db.backends.utils import split_tzname_delta 

6 

7 

8class DatabaseOperations(BaseDatabaseOperations): 

9 cast_char_field_without_max_length = "varchar" 

10 explain_prefix = "EXPLAIN" 

11 explain_options = frozenset( 

12 [ 

13 "ANALYZE", 

14 "BUFFERS", 

15 "COSTS", 

16 "SETTINGS", 

17 "SUMMARY", 

18 "TIMING", 

19 "VERBOSE", 

20 "WAL", 

21 ] 

22 ) 

23 cast_data_types = { 

24 "AutoField": "integer", 

25 "BigAutoField": "bigint", 

26 "SmallAutoField": "smallint", 

27 } 

28 

29 def unification_cast_sql(self, output_field): 

30 internal_type = output_field.get_internal_type() 

31 if internal_type in ( 

32 "GenericIPAddressField", 

33 "IPAddressField", 

34 "TimeField", 

35 "UUIDField", 

36 ): 

37 # PostgreSQL will resolve a union as type 'text' if input types are 

38 # 'unknown'. 

39 # https://www.postgresql.org/docs/current/typeconv-union-case.html 

40 # These fields cannot be implicitly cast back in the default 

41 # PostgreSQL configuration so we need to explicitly cast them. 

42 # We must also remove components of the type within brackets: 

43 # varchar(255) -> varchar. 

44 return ( 

45 "CAST(%%s AS %s)" % output_field.db_type(self.connection).split("(")[0] 

46 ) 

47 return "%s" 

48 

49 def date_extract_sql(self, lookup_type, field_name): 

50 # https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT 

51 if lookup_type == "week_day": 

52 # For consistency across backends, we return Sunday=1, Saturday=7. 

53 return "EXTRACT('dow' FROM %s) + 1" % field_name 

54 elif lookup_type == "iso_week_day": 

55 return "EXTRACT('isodow' FROM %s)" % field_name 

56 elif lookup_type == "iso_year": 

57 return "EXTRACT('isoyear' FROM %s)" % field_name 

58 else: 

59 return "EXTRACT('%s' FROM %s)" % (lookup_type, field_name) 

60 

61 def date_trunc_sql(self, lookup_type, field_name, tzname=None): 

62 field_name = self._convert_field_to_tz(field_name, tzname) 

63 # https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC 

64 return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name) 

65 

66 def _prepare_tzname_delta(self, tzname): 

67 tzname, sign, offset = split_tzname_delta(tzname) 

68 if offset: 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true

69 sign = "-" if sign == "+" else "+" 

70 return f"{tzname}{sign}{offset}" 

71 return tzname 

72 

73 def _convert_field_to_tz(self, field_name, tzname): 

74 if tzname and settings.USE_TZ: 74 ↛ 79line 74 didn't jump to line 79, because the condition on line 74 was never false

75 field_name = "%s AT TIME ZONE '%s'" % ( 

76 field_name, 

77 self._prepare_tzname_delta(tzname), 

78 ) 

79 return field_name 

80 

81 def datetime_cast_date_sql(self, field_name, tzname): 

82 field_name = self._convert_field_to_tz(field_name, tzname) 

83 return "(%s)::date" % field_name 

84 

85 def datetime_cast_time_sql(self, field_name, tzname): 

86 field_name = self._convert_field_to_tz(field_name, tzname) 

87 return "(%s)::time" % field_name 

88 

89 def datetime_extract_sql(self, lookup_type, field_name, tzname): 

90 field_name = self._convert_field_to_tz(field_name, tzname) 

91 return self.date_extract_sql(lookup_type, field_name) 

92 

93 def datetime_trunc_sql(self, lookup_type, field_name, tzname): 

94 field_name = self._convert_field_to_tz(field_name, tzname) 

95 # https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC 

96 return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name) 

97 

98 def time_trunc_sql(self, lookup_type, field_name, tzname=None): 

99 field_name = self._convert_field_to_tz(field_name, tzname) 

100 return "DATE_TRUNC('%s', %s)::time" % (lookup_type, field_name) 

101 

102 def deferrable_sql(self): 

103 return " DEFERRABLE INITIALLY DEFERRED" 

104 

105 def fetch_returned_insert_rows(self, cursor): 

106 """ 

107 Given a cursor object that has just performed an INSERT...RETURNING 

108 statement into a table, return the tuple of returned data. 

109 """ 

110 return cursor.fetchall() 

111 

112 def lookup_cast(self, lookup_type, internal_type=None): 

113 lookup = "%s" 

114 

115 # Cast text lookups to text to allow things like filter(x__contains=4) 

116 if lookup_type in ( 116 ↛ 127line 116 didn't jump to line 127, because the condition on line 116 was never true

117 "iexact", 

118 "contains", 

119 "icontains", 

120 "startswith", 

121 "istartswith", 

122 "endswith", 

123 "iendswith", 

124 "regex", 

125 "iregex", 

126 ): 

127 if internal_type in ("IPAddressField", "GenericIPAddressField"): 

128 lookup = "HOST(%s)" 

129 elif internal_type in ("CICharField", "CIEmailField", "CITextField"): 

130 lookup = "%s::citext" 

131 else: 

132 lookup = "%s::text" 

133 

134 # Use UPPER(x) for case-insensitive lookups; it's faster. 

135 if lookup_type in ("iexact", "icontains", "istartswith", "iendswith"): 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true

136 lookup = "UPPER(%s)" % lookup 

137 

138 return lookup 

139 

140 def no_limit_value(self): 

141 return None 

142 

143 def prepare_sql_script(self, sql): 

144 return [sql] 

145 

146 def quote_name(self, name): 

147 if name.startswith('"') and name.endswith('"'): 147 ↛ 148line 147 didn't jump to line 148, because the condition on line 147 was never true

148 return name # Quoting once is enough. 

149 return '"%s"' % name 

150 

151 def set_time_zone_sql(self): 

152 return "SET TIME ZONE %s" 

153 

154 def sql_flush(self, style, tables, *, reset_sequences=False, allow_cascade=False): 

155 if not tables: 

156 return [] 

157 

158 # Perform a single SQL 'TRUNCATE x, y, z...;' statement. It allows us 

159 # to truncate tables referenced by a foreign key in any other table. 

160 sql_parts = [ 

161 style.SQL_KEYWORD("TRUNCATE"), 

162 ", ".join(style.SQL_FIELD(self.quote_name(table)) for table in tables), 

163 ] 

164 if reset_sequences: 

165 sql_parts.append(style.SQL_KEYWORD("RESTART IDENTITY")) 

166 if allow_cascade: 

167 sql_parts.append(style.SQL_KEYWORD("CASCADE")) 

168 return ["%s;" % " ".join(sql_parts)] 

169 

170 def sequence_reset_by_name_sql(self, style, sequences): 

171 # 'ALTER SEQUENCE sequence_name RESTART WITH 1;'... style SQL statements 

172 # to reset sequence indices 

173 sql = [] 

174 for sequence_info in sequences: 

175 table_name = sequence_info["table"] 

176 # 'id' will be the case if it's an m2m using an autogenerated 

177 # intermediate table (see BaseDatabaseIntrospection.sequence_list). 

178 column_name = sequence_info["column"] or "id" 

179 sql.append( 

180 "%s setval(pg_get_serial_sequence('%s','%s'), 1, false);" 

181 % ( 

182 style.SQL_KEYWORD("SELECT"), 

183 style.SQL_TABLE(self.quote_name(table_name)), 

184 style.SQL_FIELD(column_name), 

185 ) 

186 ) 

187 return sql 

188 

189 def tablespace_sql(self, tablespace, inline=False): 

190 if inline: 

191 return "USING INDEX TABLESPACE %s" % self.quote_name(tablespace) 

192 else: 

193 return "TABLESPACE %s" % self.quote_name(tablespace) 

194 

195 def sequence_reset_sql(self, style, model_list): 

196 from django.db import models 

197 

198 output = [] 

199 qn = self.quote_name 

200 for model in model_list: 

201 # Use `coalesce` to set the sequence for each model to the max pk 

202 # value if there are records, or 1 if there are none. Set the 

203 # `is_called` property (the third argument to `setval`) to true if 

204 # there are records (as the max pk value is already in use), 

205 # otherwise set it to false. Use pg_get_serial_sequence to get the 

206 # underlying sequence name from the table name and column name. 

207 

208 for f in model._meta.local_fields: 

209 if isinstance(f, models.AutoField): 

210 output.append( 

211 "%s setval(pg_get_serial_sequence('%s','%s'), " 

212 "coalesce(max(%s), 1), max(%s) %s null) %s %s;" 

213 % ( 

214 style.SQL_KEYWORD("SELECT"), 

215 style.SQL_TABLE(qn(model._meta.db_table)), 

216 style.SQL_FIELD(f.column), 

217 style.SQL_FIELD(qn(f.column)), 

218 style.SQL_FIELD(qn(f.column)), 

219 style.SQL_KEYWORD("IS NOT"), 

220 style.SQL_KEYWORD("FROM"), 

221 style.SQL_TABLE(qn(model._meta.db_table)), 

222 ) 

223 ) 

224 # Only one AutoField is allowed per model, so don't bother 

225 # continuing. 

226 break 

227 return output 

228 

229 def prep_for_iexact_query(self, x): 

230 return x 

231 

232 def max_name_length(self): 

233 """ 

234 Return the maximum length of an identifier. 

235 

236 The maximum length of an identifier is 63 by default, but can be 

237 changed by recompiling PostgreSQL after editing the NAMEDATALEN 

238 macro in src/include/pg_config_manual.h. 

239 

240 This implementation returns 63, but can be overridden by a custom 

241 database backend that inherits most of its behavior from this one. 

242 """ 

243 return 63 

244 

245 def distinct_sql(self, fields, params): 

246 if fields: 

247 params = [param for param_list in params for param in param_list] 

248 return (["DISTINCT ON (%s)" % ", ".join(fields)], params) 

249 else: 

250 return ["DISTINCT"], [] 

251 

252 def last_executed_query(self, cursor, sql, params): 

253 # https://www.psycopg.org/docs/cursor.html#cursor.query 

254 # The query attribute is a Psycopg extension to the DB API 2.0. 

255 if cursor.query is not None: 

256 return cursor.query.decode() 

257 return None 

258 

259 def return_insert_columns(self, fields): 

260 if not fields: 260 ↛ 261line 260 didn't jump to line 261, because the condition on line 260 was never true

261 return "", () 

262 columns = [ 

263 "%s.%s" 

264 % ( 

265 self.quote_name(field.model._meta.db_table), 

266 self.quote_name(field.column), 

267 ) 

268 for field in fields 

269 ] 

270 return "RETURNING %s" % ", ".join(columns), () 

271 

272 def bulk_insert_sql(self, fields, placeholder_rows): 

273 placeholder_rows_sql = (", ".join(row) for row in placeholder_rows) 

274 values_sql = ", ".join("(%s)" % sql for sql in placeholder_rows_sql) 

275 return "VALUES " + values_sql 

276 

277 def adapt_datefield_value(self, value): 

278 return value 

279 

280 def adapt_datetimefield_value(self, value): 

281 return value 

282 

283 def adapt_timefield_value(self, value): 

284 return value 

285 

286 def adapt_decimalfield_value(self, value, max_digits=None, decimal_places=None): 

287 return value 

288 

289 def adapt_ipaddressfield_value(self, value): 

290 if value: 

291 return Inet(value) 

292 return None 

293 

294 def subtract_temporals(self, internal_type, lhs, rhs): 

295 if internal_type == "DateField": 

296 lhs_sql, lhs_params = lhs 

297 rhs_sql, rhs_params = rhs 

298 params = (*lhs_params, *rhs_params) 

299 return "(interval '1 day' * (%s - %s))" % (lhs_sql, rhs_sql), params 

300 return super().subtract_temporals(internal_type, lhs, rhs) 

301 

302 def explain_query_prefix(self, format=None, **options): 

303 extra = {} 

304 # Normalize options. 

305 if options: 

306 options = { 

307 name.upper(): "true" if value else "false" 

308 for name, value in options.items() 

309 } 

310 for valid_option in self.explain_options: 

311 value = options.pop(valid_option, None) 

312 if value is not None: 

313 extra[valid_option.upper()] = value 

314 prefix = super().explain_query_prefix(format, **options) 

315 if format: 

316 extra["FORMAT"] = format 

317 if extra: 

318 prefix += " (%s)" % ", ".join("%s %s" % i for i in extra.items()) 

319 return prefix 

320 

321 def ignore_conflicts_suffix_sql(self, ignore_conflicts=None): 

322 return ( 

323 "ON CONFLICT DO NOTHING" 

324 if ignore_conflicts 

325 else super().ignore_conflicts_suffix_sql(ignore_conflicts) 

326 )