Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/schema.py: 69%
95 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import psycopg2
3from django.db.backends.base.schema import BaseDatabaseSchemaEditor
4from django.db.backends.ddl_references import IndexColumns
5from django.db.backends.utils import strip_quotes
8class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
10 sql_create_sequence = "CREATE SEQUENCE %(sequence)s"
11 sql_delete_sequence = "DROP SEQUENCE IF EXISTS %(sequence)s CASCADE"
12 sql_set_sequence_max = (
13 "SELECT setval('%(sequence)s', MAX(%(column)s)) FROM %(table)s"
14 )
15 sql_set_sequence_owner = "ALTER SEQUENCE %(sequence)s OWNED BY %(table)s.%(column)s"
17 sql_create_index = (
18 "CREATE INDEX %(name)s ON %(table)s%(using)s "
19 "(%(columns)s)%(include)s%(extra)s%(condition)s"
20 )
21 sql_create_index_concurrently = (
22 "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s%(using)s "
23 "(%(columns)s)%(include)s%(extra)s%(condition)s"
24 )
25 sql_delete_index = "DROP INDEX IF EXISTS %(name)s"
26 sql_delete_index_concurrently = "DROP INDEX CONCURRENTLY IF EXISTS %(name)s"
28 # Setting the constraint to IMMEDIATE to allow changing data in the same
29 # transaction.
30 sql_create_column_inline_fk = (
31 "CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s"
32 "; SET CONSTRAINTS %(namespace)s%(name)s IMMEDIATE"
33 )
34 # Setting the constraint to IMMEDIATE runs any deferred checks to allow
35 # dropping it in the same transaction.
36 sql_delete_fk = (
37 "SET CONSTRAINTS %(name)s IMMEDIATE; "
38 "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
39 )
40 sql_delete_procedure = "DROP FUNCTION %(procedure)s(%(param_types)s)"
42 def quote_value(self, value):
43 if isinstance(value, str):
44 value = value.replace("%", "%%")
45 adapted = psycopg2.extensions.adapt(value)
46 if hasattr(adapted, "encoding"):
47 adapted.encoding = "utf8"
48 # getquoted() returns a quoted bytestring of the adapted value.
49 return adapted.getquoted().decode()
51 def _field_indexes_sql(self, model, field):
52 output = super()._field_indexes_sql(model, field)
53 like_index_statement = self._create_like_index_sql(model, field)
54 if like_index_statement is not None:
55 output.append(like_index_statement)
56 return output
58 def _field_data_type(self, field):
59 if field.is_relation: 59 ↛ 60line 59 didn't jump to line 60, because the condition on line 59 was never true
60 return field.rel_db_type(self.connection)
61 return self.connection.data_types.get(
62 field.get_internal_type(),
63 field.db_type(self.connection),
64 )
66 def _field_base_data_types(self, field):
67 # Yield base data types for array fields.
68 if field.base_field.get_internal_type() == "ArrayField":
69 yield from self._field_base_data_types(field.base_field)
70 else:
71 yield self._field_data_type(field.base_field)
73 def _create_like_index_sql(self, model, field):
74 """
75 Return the statement to create an index with varchar operator pattern
76 when the column type is 'varchar' or 'text', otherwise return None.
77 """
78 db_type = field.db_type(connection=self.connection)
79 if db_type is not None and (field.db_index or field.unique):
80 # Fields with database column types of `varchar` and `text` need
81 # a second index that specifies their operator class, which is
82 # needed when performing correct LIKE queries outside the
83 # C locale. See #12234.
84 #
85 # The same doesn't apply to array fields such as varchar[size]
86 # and text[size], so skip them.
87 if "[" in db_type: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true
88 return None
89 if db_type.startswith("varchar"):
90 return self._create_index_sql(
91 model,
92 fields=[field],
93 suffix="_like",
94 opclasses=["varchar_pattern_ops"],
95 )
96 elif db_type.startswith("text"): 96 ↛ 97line 96 didn't jump to line 97, because the condition on line 96 was never true
97 return self._create_index_sql(
98 model,
99 fields=[field],
100 suffix="_like",
101 opclasses=["text_pattern_ops"],
102 )
103 return None
105 def _alter_column_type_sql(self, model, old_field, new_field, new_type):
106 self.sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s"
107 # Cast when data type changed.
108 using_sql = " USING %(column)s::%(type)s"
109 new_internal_type = new_field.get_internal_type()
110 old_internal_type = old_field.get_internal_type()
111 if new_internal_type == "ArrayField" and new_internal_type == old_internal_type: 111 ↛ 113line 111 didn't jump to line 113, because the condition on line 111 was never true
112 # Compare base data types for array fields.
113 if list(self._field_base_data_types(old_field)) != list(
114 self._field_base_data_types(new_field)
115 ):
116 self.sql_alter_column_type += using_sql
117 elif self._field_data_type(old_field) != self._field_data_type(new_field): 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true
118 self.sql_alter_column_type += using_sql
119 # Make ALTER TYPE with SERIAL make sense.
120 table = strip_quotes(model._meta.db_table)
121 serial_fields_map = {
122 "bigserial": "bigint",
123 "serial": "integer",
124 "smallserial": "smallint",
125 }
126 if new_type.lower() in serial_fields_map: 126 ↛ 127line 126 didn't jump to line 127, because the condition on line 126 was never true
127 column = strip_quotes(new_field.column)
128 sequence_name = "%s_%s_seq" % (table, column)
129 return (
130 (
131 self.sql_alter_column_type
132 % {
133 "column": self.quote_name(column),
134 "type": serial_fields_map[new_type.lower()],
135 },
136 [],
137 ),
138 [
139 (
140 self.sql_delete_sequence
141 % {
142 "sequence": self.quote_name(sequence_name),
143 },
144 [],
145 ),
146 (
147 self.sql_create_sequence
148 % {
149 "sequence": self.quote_name(sequence_name),
150 },
151 [],
152 ),
153 (
154 self.sql_alter_column
155 % {
156 "table": self.quote_name(table),
157 "changes": self.sql_alter_column_default
158 % {
159 "column": self.quote_name(column),
160 "default": "nextval('%s')"
161 % self.quote_name(sequence_name),
162 },
163 },
164 [],
165 ),
166 (
167 self.sql_set_sequence_max
168 % {
169 "table": self.quote_name(table),
170 "column": self.quote_name(column),
171 "sequence": self.quote_name(sequence_name),
172 },
173 [],
174 ),
175 (
176 self.sql_set_sequence_owner
177 % {
178 "table": self.quote_name(table),
179 "column": self.quote_name(column),
180 "sequence": self.quote_name(sequence_name),
181 },
182 [],
183 ),
184 ],
185 )
186 elif ( 186 ↛ 191line 186 didn't jump to line 191
187 old_field.db_parameters(connection=self.connection)["type"]
188 in serial_fields_map
189 ):
190 # Drop the sequence if migrating away from AutoField.
191 column = strip_quotes(new_field.column)
192 sequence_name = "%s_%s_seq" % (table, column)
193 fragment, _ = super()._alter_column_type_sql(
194 model, old_field, new_field, new_type
195 )
196 return fragment, [
197 (
198 self.sql_delete_sequence
199 % {
200 "sequence": self.quote_name(sequence_name),
201 },
202 [],
203 ),
204 ]
205 else:
206 return super()._alter_column_type_sql(model, old_field, new_field, new_type)
208 def _alter_field(
209 self,
210 model,
211 old_field,
212 new_field,
213 old_type,
214 new_type,
215 old_db_params,
216 new_db_params,
217 strict=False,
218 ):
219 # Drop indexes on varchar/text/citext columns that are changing to a
220 # different type.
221 if (old_field.db_index or old_field.unique) and ( 221 ↛ 226line 221 didn't jump to line 226, because the condition on line 221 was never true
222 (old_type.startswith("varchar") and not new_type.startswith("varchar"))
223 or (old_type.startswith("text") and not new_type.startswith("text"))
224 or (old_type.startswith("citext") and not new_type.startswith("citext"))
225 ):
226 index_name = self._create_index_name(
227 model._meta.db_table, [old_field.column], suffix="_like"
228 )
229 self.execute(self._delete_index_sql(model, index_name))
231 super()._alter_field(
232 model,
233 old_field,
234 new_field,
235 old_type,
236 new_type,
237 old_db_params,
238 new_db_params,
239 strict,
240 )
241 # Added an index? Create any PostgreSQL-specific indexes.
242 if (not (old_field.db_index or old_field.unique) and new_field.db_index) or (
243 not old_field.unique and new_field.unique
244 ):
245 like_index_statement = self._create_like_index_sql(model, new_field)
246 if like_index_statement is not None: 246 ↛ 250line 246 didn't jump to line 250, because the condition on line 246 was never false
247 self.execute(like_index_statement)
249 # Removed an index? Drop any PostgreSQL-specific indexes.
250 if old_field.unique and not (new_field.db_index or new_field.unique):
251 index_to_remove = self._create_index_name(
252 model._meta.db_table, [old_field.column], suffix="_like"
253 )
254 self.execute(self._delete_index_sql(model, index_to_remove))
256 def _index_columns(self, table, columns, col_suffixes, opclasses):
257 if opclasses:
258 return IndexColumns(
259 table,
260 columns,
261 self.quote_name,
262 col_suffixes=col_suffixes,
263 opclasses=opclasses,
264 )
265 return super()._index_columns(table, columns, col_suffixes, opclasses)
267 def add_index(self, model, index, concurrently=False):
268 self.execute(
269 index.create_sql(model, self, concurrently=concurrently), params=None
270 )
272 def remove_index(self, model, index, concurrently=False):
273 self.execute(index.remove_sql(model, self, concurrently=concurrently))
275 def _delete_index_sql(self, model, name, sql=None, concurrently=False):
276 sql = (
277 self.sql_delete_index_concurrently
278 if concurrently
279 else self.sql_delete_index
280 )
281 return super()._delete_index_sql(model, name, sql)
283 def _create_index_sql(
284 self,
285 model,
286 *,
287 fields=None,
288 name=None,
289 suffix="",
290 using="",
291 db_tablespace=None,
292 col_suffixes=(),
293 sql=None,
294 opclasses=(),
295 condition=None,
296 concurrently=False,
297 include=None,
298 expressions=None,
299 ):
300 sql = (
301 self.sql_create_index
302 if not concurrently
303 else self.sql_create_index_concurrently
304 )
305 return super()._create_index_sql(
306 model,
307 fields=fields,
308 name=name,
309 suffix=suffix,
310 using=using,
311 db_tablespace=db_tablespace,
312 col_suffixes=col_suffixes,
313 sql=sql,
314 opclasses=opclasses,
315 condition=condition,
316 include=include,
317 expressions=expressions,
318 )