Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/operations.py: 36%
162 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from psycopg2.extras import Inet
3from django.conf import settings
4from django.db.backends.base.operations import BaseDatabaseOperations
5from django.db.backends.utils import split_tzname_delta
8class DatabaseOperations(BaseDatabaseOperations):
9 cast_char_field_without_max_length = "varchar"
10 explain_prefix = "EXPLAIN"
11 explain_options = frozenset(
12 [
13 "ANALYZE",
14 "BUFFERS",
15 "COSTS",
16 "SETTINGS",
17 "SUMMARY",
18 "TIMING",
19 "VERBOSE",
20 "WAL",
21 ]
22 )
23 cast_data_types = {
24 "AutoField": "integer",
25 "BigAutoField": "bigint",
26 "SmallAutoField": "smallint",
27 }
29 def unification_cast_sql(self, output_field):
30 internal_type = output_field.get_internal_type()
31 if internal_type in (
32 "GenericIPAddressField",
33 "IPAddressField",
34 "TimeField",
35 "UUIDField",
36 ):
37 # PostgreSQL will resolve a union as type 'text' if input types are
38 # 'unknown'.
39 # https://www.postgresql.org/docs/current/typeconv-union-case.html
40 # These fields cannot be implicitly cast back in the default
41 # PostgreSQL configuration so we need to explicitly cast them.
42 # We must also remove components of the type within brackets:
43 # varchar(255) -> varchar.
44 return (
45 "CAST(%%s AS %s)" % output_field.db_type(self.connection).split("(")[0]
46 )
47 return "%s"
49 def date_extract_sql(self, lookup_type, field_name):
50 # https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT
51 if lookup_type == "week_day":
52 # For consistency across backends, we return Sunday=1, Saturday=7.
53 return "EXTRACT('dow' FROM %s) + 1" % field_name
54 elif lookup_type == "iso_week_day":
55 return "EXTRACT('isodow' FROM %s)" % field_name
56 elif lookup_type == "iso_year":
57 return "EXTRACT('isoyear' FROM %s)" % field_name
58 else:
59 return "EXTRACT('%s' FROM %s)" % (lookup_type, field_name)
61 def date_trunc_sql(self, lookup_type, field_name, tzname=None):
62 field_name = self._convert_field_to_tz(field_name, tzname)
63 # https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC
64 return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name)
66 def _prepare_tzname_delta(self, tzname):
67 tzname, sign, offset = split_tzname_delta(tzname)
68 if offset: 68 ↛ 69line 68 didn't jump to line 69, because the condition on line 68 was never true
69 sign = "-" if sign == "+" else "+"
70 return f"{tzname}{sign}{offset}"
71 return tzname
73 def _convert_field_to_tz(self, field_name, tzname):
74 if tzname and settings.USE_TZ: 74 ↛ 79line 74 didn't jump to line 79, because the condition on line 74 was never false
75 field_name = "%s AT TIME ZONE '%s'" % (
76 field_name,
77 self._prepare_tzname_delta(tzname),
78 )
79 return field_name
81 def datetime_cast_date_sql(self, field_name, tzname):
82 field_name = self._convert_field_to_tz(field_name, tzname)
83 return "(%s)::date" % field_name
85 def datetime_cast_time_sql(self, field_name, tzname):
86 field_name = self._convert_field_to_tz(field_name, tzname)
87 return "(%s)::time" % field_name
89 def datetime_extract_sql(self, lookup_type, field_name, tzname):
90 field_name = self._convert_field_to_tz(field_name, tzname)
91 return self.date_extract_sql(lookup_type, field_name)
93 def datetime_trunc_sql(self, lookup_type, field_name, tzname):
94 field_name = self._convert_field_to_tz(field_name, tzname)
95 # https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TRUNC
96 return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name)
98 def time_trunc_sql(self, lookup_type, field_name, tzname=None):
99 field_name = self._convert_field_to_tz(field_name, tzname)
100 return "DATE_TRUNC('%s', %s)::time" % (lookup_type, field_name)
102 def deferrable_sql(self):
103 return " DEFERRABLE INITIALLY DEFERRED"
105 def fetch_returned_insert_rows(self, cursor):
106 """
107 Given a cursor object that has just performed an INSERT...RETURNING
108 statement into a table, return the tuple of returned data.
109 """
110 return cursor.fetchall()
112 def lookup_cast(self, lookup_type, internal_type=None):
113 lookup = "%s"
115 # Cast text lookups to text to allow things like filter(x__contains=4)
116 if lookup_type in ( 116 ↛ 127line 116 didn't jump to line 127, because the condition on line 116 was never true
117 "iexact",
118 "contains",
119 "icontains",
120 "startswith",
121 "istartswith",
122 "endswith",
123 "iendswith",
124 "regex",
125 "iregex",
126 ):
127 if internal_type in ("IPAddressField", "GenericIPAddressField"):
128 lookup = "HOST(%s)"
129 elif internal_type in ("CICharField", "CIEmailField", "CITextField"):
130 lookup = "%s::citext"
131 else:
132 lookup = "%s::text"
134 # Use UPPER(x) for case-insensitive lookups; it's faster.
135 if lookup_type in ("iexact", "icontains", "istartswith", "iendswith"): 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true
136 lookup = "UPPER(%s)" % lookup
138 return lookup
140 def no_limit_value(self):
141 return None
143 def prepare_sql_script(self, sql):
144 return [sql]
146 def quote_name(self, name):
147 if name.startswith('"') and name.endswith('"'): 147 ↛ 148line 147 didn't jump to line 148, because the condition on line 147 was never true
148 return name # Quoting once is enough.
149 return '"%s"' % name
151 def set_time_zone_sql(self):
152 return "SET TIME ZONE %s"
154 def sql_flush(self, style, tables, *, reset_sequences=False, allow_cascade=False):
155 if not tables:
156 return []
158 # Perform a single SQL 'TRUNCATE x, y, z...;' statement. It allows us
159 # to truncate tables referenced by a foreign key in any other table.
160 sql_parts = [
161 style.SQL_KEYWORD("TRUNCATE"),
162 ", ".join(style.SQL_FIELD(self.quote_name(table)) for table in tables),
163 ]
164 if reset_sequences:
165 sql_parts.append(style.SQL_KEYWORD("RESTART IDENTITY"))
166 if allow_cascade:
167 sql_parts.append(style.SQL_KEYWORD("CASCADE"))
168 return ["%s;" % " ".join(sql_parts)]
170 def sequence_reset_by_name_sql(self, style, sequences):
171 # 'ALTER SEQUENCE sequence_name RESTART WITH 1;'... style SQL statements
172 # to reset sequence indices
173 sql = []
174 for sequence_info in sequences:
175 table_name = sequence_info["table"]
176 # 'id' will be the case if it's an m2m using an autogenerated
177 # intermediate table (see BaseDatabaseIntrospection.sequence_list).
178 column_name = sequence_info["column"] or "id"
179 sql.append(
180 "%s setval(pg_get_serial_sequence('%s','%s'), 1, false);"
181 % (
182 style.SQL_KEYWORD("SELECT"),
183 style.SQL_TABLE(self.quote_name(table_name)),
184 style.SQL_FIELD(column_name),
185 )
186 )
187 return sql
189 def tablespace_sql(self, tablespace, inline=False):
190 if inline:
191 return "USING INDEX TABLESPACE %s" % self.quote_name(tablespace)
192 else:
193 return "TABLESPACE %s" % self.quote_name(tablespace)
195 def sequence_reset_sql(self, style, model_list):
196 from django.db import models
198 output = []
199 qn = self.quote_name
200 for model in model_list:
201 # Use `coalesce` to set the sequence for each model to the max pk
202 # value if there are records, or 1 if there are none. Set the
203 # `is_called` property (the third argument to `setval`) to true if
204 # there are records (as the max pk value is already in use),
205 # otherwise set it to false. Use pg_get_serial_sequence to get the
206 # underlying sequence name from the table name and column name.
208 for f in model._meta.local_fields:
209 if isinstance(f, models.AutoField):
210 output.append(
211 "%s setval(pg_get_serial_sequence('%s','%s'), "
212 "coalesce(max(%s), 1), max(%s) %s null) %s %s;"
213 % (
214 style.SQL_KEYWORD("SELECT"),
215 style.SQL_TABLE(qn(model._meta.db_table)),
216 style.SQL_FIELD(f.column),
217 style.SQL_FIELD(qn(f.column)),
218 style.SQL_FIELD(qn(f.column)),
219 style.SQL_KEYWORD("IS NOT"),
220 style.SQL_KEYWORD("FROM"),
221 style.SQL_TABLE(qn(model._meta.db_table)),
222 )
223 )
224 # Only one AutoField is allowed per model, so don't bother
225 # continuing.
226 break
227 return output
229 def prep_for_iexact_query(self, x):
230 return x
232 def max_name_length(self):
233 """
234 Return the maximum length of an identifier.
236 The maximum length of an identifier is 63 by default, but can be
237 changed by recompiling PostgreSQL after editing the NAMEDATALEN
238 macro in src/include/pg_config_manual.h.
240 This implementation returns 63, but can be overridden by a custom
241 database backend that inherits most of its behavior from this one.
242 """
243 return 63
245 def distinct_sql(self, fields, params):
246 if fields:
247 params = [param for param_list in params for param in param_list]
248 return (["DISTINCT ON (%s)" % ", ".join(fields)], params)
249 else:
250 return ["DISTINCT"], []
252 def last_executed_query(self, cursor, sql, params):
253 # https://www.psycopg.org/docs/cursor.html#cursor.query
254 # The query attribute is a Psycopg extension to the DB API 2.0.
255 if cursor.query is not None:
256 return cursor.query.decode()
257 return None
259 def return_insert_columns(self, fields):
260 if not fields: 260 ↛ 261line 260 didn't jump to line 261, because the condition on line 260 was never true
261 return "", ()
262 columns = [
263 "%s.%s"
264 % (
265 self.quote_name(field.model._meta.db_table),
266 self.quote_name(field.column),
267 )
268 for field in fields
269 ]
270 return "RETURNING %s" % ", ".join(columns), ()
272 def bulk_insert_sql(self, fields, placeholder_rows):
273 placeholder_rows_sql = (", ".join(row) for row in placeholder_rows)
274 values_sql = ", ".join("(%s)" % sql for sql in placeholder_rows_sql)
275 return "VALUES " + values_sql
277 def adapt_datefield_value(self, value):
278 return value
280 def adapt_datetimefield_value(self, value):
281 return value
283 def adapt_timefield_value(self, value):
284 return value
286 def adapt_decimalfield_value(self, value, max_digits=None, decimal_places=None):
287 return value
289 def adapt_ipaddressfield_value(self, value):
290 if value:
291 return Inet(value)
292 return None
294 def subtract_temporals(self, internal_type, lhs, rhs):
295 if internal_type == "DateField":
296 lhs_sql, lhs_params = lhs
297 rhs_sql, rhs_params = rhs
298 params = (*lhs_params, *rhs_params)
299 return "(interval '1 day' * (%s - %s))" % (lhs_sql, rhs_sql), params
300 return super().subtract_temporals(internal_type, lhs, rhs)
302 def explain_query_prefix(self, format=None, **options):
303 extra = {}
304 # Normalize options.
305 if options:
306 options = {
307 name.upper(): "true" if value else "false"
308 for name, value in options.items()
309 }
310 for valid_option in self.explain_options:
311 value = options.pop(valid_option, None)
312 if value is not None:
313 extra[valid_option.upper()] = value
314 prefix = super().explain_query_prefix(format, **options)
315 if format:
316 extra["FORMAT"] = format
317 if extra:
318 prefix += " (%s)" % ", ".join("%s %s" % i for i in extra.items())
319 return prefix
321 def ignore_conflicts_suffix_sql(self, ignore_conflicts=None):
322 return (
323 "ON CONFLICT DO NOTHING"
324 if ignore_conflicts
325 else super().ignore_conflicts_suffix_sql(ignore_conflicts)
326 )