Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/base/schema.py: 69%

641 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import logging 

2from datetime import datetime 

3 

4from django.db.backends.ddl_references import ( 

5 Columns, 

6 Expressions, 

7 ForeignKeyName, 

8 IndexName, 

9 Statement, 

10 Table, 

11) 

12from django.db.backends.utils import names_digest, split_identifier 

13from django.db.models import Deferrable, Index 

14from django.db.models.sql import Query 

15from django.db.transaction import TransactionManagementError, atomic 

16from django.utils import timezone 

17 

18logger = logging.getLogger("django.db.backends.schema") 

19 

20 

21def _is_relevant_relation(relation, altered_field): 

22 """ 

23 When altering the given field, must constraints on its model from the given 

24 relation be temporarily dropped? 

25 """ 

26 field = relation.field 

27 if field.many_to_many: 27 ↛ 29line 27 didn't jump to line 29, because the condition on line 27 was never true

28 # M2M reverse field 

29 return False 

30 if altered_field.primary_key and field.to_fields == [None]: 30 ↛ 32line 30 didn't jump to line 32, because the condition on line 30 was never true

31 # Foreign key constraint on the primary key, which is being altered. 

32 return True 

33 # Is the constraint targeting the field being altered? 

34 return altered_field.name in field.to_fields 

35 

36 

37def _all_related_fields(model): 

38 return model._meta._get_fields( 

39 forward=False, 

40 reverse=True, 

41 include_hidden=True, 

42 include_parents=False, 

43 ) 

44 

45 

46def _related_non_m2m_objects(old_field, new_field): 

47 # Filter out m2m objects from reverse relations. 

48 # Return (old_relation, new_relation) tuples. 

49 related_fields = zip( 

50 ( 

51 obj 

52 for obj in _all_related_fields(old_field.model) 

53 if _is_relevant_relation(obj, old_field) 

54 ), 

55 ( 

56 obj 

57 for obj in _all_related_fields(new_field.model) 

58 if _is_relevant_relation(obj, new_field) 

59 ), 

60 ) 

61 for old_rel, new_rel in related_fields: 61 ↛ 62line 61 didn't jump to line 62, because the loop on line 61 never started

62 yield old_rel, new_rel 

63 yield from _related_non_m2m_objects( 

64 old_rel.remote_field, 

65 new_rel.remote_field, 

66 ) 

67 

68 

69class BaseDatabaseSchemaEditor: 

70 """ 

71 This class and its subclasses are responsible for emitting schema-changing 

72 statements to the databases - model creation/removal/alteration, field 

73 renaming, index fiddling, and so on. 

74 """ 

75 

76 # Overrideable SQL templates 

77 sql_create_table = "CREATE TABLE %(table)s (%(definition)s)" 

78 sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s" 

79 sql_retablespace_table = "ALTER TABLE %(table)s SET TABLESPACE %(new_tablespace)s" 

80 sql_delete_table = "DROP TABLE %(table)s CASCADE" 

81 

82 sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s" 

83 sql_alter_column = "ALTER TABLE %(table)s %(changes)s" 

84 sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s" 

85 sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL" 

86 sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL" 

87 sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s" 

88 sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT" 

89 sql_alter_column_no_default_null = sql_alter_column_no_default 

90 sql_alter_column_collate = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s" 

91 sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE" 

92 sql_rename_column = ( 

93 "ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s" 

94 ) 

95 sql_update_with_default = ( 

96 "UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL" 

97 ) 

98 

99 sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s" 

100 sql_check_constraint = "CHECK (%(check)s)" 

101 sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s" 

102 sql_constraint = "CONSTRAINT %(name)s %(constraint)s" 

103 

104 sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)" 

105 sql_delete_check = sql_delete_constraint 

106 

107 sql_create_unique = ( 

108 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s " 

109 "UNIQUE (%(columns)s)%(deferrable)s" 

110 ) 

111 sql_delete_unique = sql_delete_constraint 

112 

113 sql_create_fk = ( 

114 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) " 

115 "REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s" 

116 ) 

117 sql_create_inline_fk = None 

118 sql_create_column_inline_fk = None 

119 sql_delete_fk = sql_delete_constraint 

120 

121 sql_create_index = ( 

122 "CREATE INDEX %(name)s ON %(table)s " 

123 "(%(columns)s)%(include)s%(extra)s%(condition)s" 

124 ) 

125 sql_create_unique_index = ( 

126 "CREATE UNIQUE INDEX %(name)s ON %(table)s " 

127 "(%(columns)s)%(include)s%(condition)s" 

128 ) 

129 sql_delete_index = "DROP INDEX %(name)s" 

130 

131 sql_create_pk = ( 

132 "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)" 

133 ) 

134 sql_delete_pk = sql_delete_constraint 

135 

136 sql_delete_procedure = "DROP PROCEDURE %(procedure)s" 

137 

138 def __init__(self, connection, collect_sql=False, atomic=True): 

139 self.connection = connection 

140 self.collect_sql = collect_sql 

141 if self.collect_sql: 141 ↛ 142line 141 didn't jump to line 142, because the condition on line 141 was never true

142 self.collected_sql = [] 

143 self.atomic_migration = self.connection.features.can_rollback_ddl and atomic 

144 

145 # State-managing methods 

146 

147 def __enter__(self): 

148 self.deferred_sql = [] 

149 if self.atomic_migration: 149 ↛ 152line 149 didn't jump to line 152, because the condition on line 149 was never false

150 self.atomic = atomic(self.connection.alias) 

151 self.atomic.__enter__() 

152 return self 

153 

154 def __exit__(self, exc_type, exc_value, traceback): 

155 if exc_type is None: 155 ↛ 158line 155 didn't jump to line 158, because the condition on line 155 was never false

156 for sql in self.deferred_sql: 

157 self.execute(sql) 

158 if self.atomic_migration: 158 ↛ exitline 158 didn't return from function '__exit__', because the condition on line 158 was never false

159 self.atomic.__exit__(exc_type, exc_value, traceback) 

160 

161 # Core utility functions 

162 

163 def execute(self, sql, params=()): 

164 """Execute the given SQL statement, with optional parameters.""" 

165 # Don't perform the transactional DDL check if SQL is being collected 

166 # as it's not going to be executed anyway. 

167 if ( 167 ↛ 172line 167 didn't jump to line 172

168 not self.collect_sql 

169 and self.connection.in_atomic_block 

170 and not self.connection.features.can_rollback_ddl 

171 ): 

172 raise TransactionManagementError( 

173 "Executing DDL statements while in a transaction on databases " 

174 "that can't perform a rollback is prohibited." 

175 ) 

176 # Account for non-string statement objects. 

177 sql = str(sql) 

178 # Log the command we're running, then run it 

179 logger.debug( 

180 "%s; (params %r)", sql, params, extra={"params": params, "sql": sql} 

181 ) 

182 if self.collect_sql: 182 ↛ 183line 182 didn't jump to line 183, because the condition on line 182 was never true

183 ending = "" if sql.rstrip().endswith(";") else ";" 

184 if params is not None: 

185 self.collected_sql.append( 

186 (sql % tuple(map(self.quote_value, params))) + ending 

187 ) 

188 else: 

189 self.collected_sql.append(sql + ending) 

190 else: 

191 with self.connection.cursor() as cursor: 

192 cursor.execute(sql, params) 

193 

194 def quote_name(self, name): 

195 return self.connection.ops.quote_name(name) 

196 

197 def table_sql(self, model): 

198 """Take a model and return its table definition.""" 

199 # Add any unique_togethers (always deferred, as some fields might be 

200 # created afterward, like geometry fields with some backends). 

201 for field_names in model._meta.unique_together: 

202 fields = [model._meta.get_field(field) for field in field_names] 

203 self.deferred_sql.append(self._create_unique_sql(model, fields)) 

204 # Create column SQL, add FK deferreds if needed. 

205 column_sqls = [] 

206 params = [] 

207 for field in model._meta.local_fields: 

208 # SQL. 

209 definition, extra_params = self.column_sql(model, field) 

210 if definition is None: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 continue 

212 # Check constraints can go on the column SQL here. 

213 db_params = field.db_parameters(connection=self.connection) 

214 if db_params["check"]: 

215 definition += " " + self.sql_check_constraint % db_params 

216 # Autoincrement SQL (for backends with inline variant). 

217 col_type_suffix = field.db_type_suffix(connection=self.connection) 

218 if col_type_suffix: 218 ↛ 219line 218 didn't jump to line 219, because the condition on line 218 was never true

219 definition += " %s" % col_type_suffix 

220 params.extend(extra_params) 

221 # FK. 

222 if field.remote_field and field.db_constraint: 

223 to_table = field.remote_field.model._meta.db_table 

224 to_column = field.remote_field.model._meta.get_field( 

225 field.remote_field.field_name 

226 ).column 

227 if self.sql_create_inline_fk: 227 ↛ 228line 227 didn't jump to line 228, because the condition on line 227 was never true

228 definition += " " + self.sql_create_inline_fk % { 

229 "to_table": self.quote_name(to_table), 

230 "to_column": self.quote_name(to_column), 

231 } 

232 elif self.connection.features.supports_foreign_keys: 232 ↛ 239line 232 didn't jump to line 239, because the condition on line 232 was never false

233 self.deferred_sql.append( 

234 self._create_fk_sql( 

235 model, field, "_fk_%(to_table)s_%(to_column)s" 

236 ) 

237 ) 

238 # Add the SQL to our big list. 

239 column_sqls.append( 

240 "%s %s" 

241 % ( 

242 self.quote_name(field.column), 

243 definition, 

244 ) 

245 ) 

246 # Autoincrement SQL (for backends with post table definition 

247 # variant). 

248 if field.get_internal_type() in ( 

249 "AutoField", 

250 "BigAutoField", 

251 "SmallAutoField", 

252 ): 

253 autoinc_sql = self.connection.ops.autoinc_sql( 

254 model._meta.db_table, field.column 

255 ) 

256 if autoinc_sql: 256 ↛ 257line 256 didn't jump to line 257, because the condition on line 256 was never true

257 self.deferred_sql.extend(autoinc_sql) 

258 constraints = [ 

259 constraint.constraint_sql(model, self) 

260 for constraint in model._meta.constraints 

261 ] 

262 sql = self.sql_create_table % { 

263 "table": self.quote_name(model._meta.db_table), 

264 "definition": ", ".join( 

265 constraint for constraint in (*column_sqls, *constraints) if constraint 

266 ), 

267 } 

268 if model._meta.db_tablespace: 268 ↛ 269line 268 didn't jump to line 269, because the condition on line 268 was never true

269 tablespace_sql = self.connection.ops.tablespace_sql( 

270 model._meta.db_tablespace 

271 ) 

272 if tablespace_sql: 

273 sql += " " + tablespace_sql 

274 return sql, params 

275 

276 # Field <-> database mapping functions 

277 

278 def _iter_column_sql(self, column_db_type, params, model, field, include_default): 

279 yield column_db_type 

280 collation = getattr(field, "db_collation", None) 

281 if collation: 281 ↛ 282line 281 didn't jump to line 282, because the condition on line 281 was never true

282 yield self._collate_sql(collation) 

283 # Work out nullability. 

284 null = field.null 

285 # Include a default value, if requested. 

286 include_default = ( 

287 include_default 

288 and not self.skip_default(field) 

289 and 

290 # Don't include a default value if it's a nullable field and the 

291 # default cannot be dropped in the ALTER COLUMN statement (e.g. 

292 # MySQL longtext and longblob). 

293 not (null and self.skip_default_on_alter(field)) 

294 ) 

295 if include_default: 

296 default_value = self.effective_default(field) 

297 if default_value is not None: 

298 column_default = "DEFAULT " + self._column_default_sql(field) 

299 if self.connection.features.requires_literal_defaults: 299 ↛ 303line 299 didn't jump to line 303, because the condition on line 299 was never true

300 # Some databases can't take defaults as a parameter (Oracle). 

301 # If this is the case, the individual schema backend should 

302 # implement prepare_default(). 

303 yield column_default % self.prepare_default(default_value) 

304 else: 

305 yield column_default 

306 params.append(default_value) 

307 # Oracle treats the empty string ('') as null, so coerce the null 

308 # option whenever '' is a possible value. 

309 if ( 309 ↛ 314line 309 didn't jump to line 314

310 field.empty_strings_allowed 

311 and not field.primary_key 

312 and self.connection.features.interprets_empty_strings_as_nulls 

313 ): 

314 null = True 

315 if not null: 

316 yield "NOT NULL" 

317 elif not self.connection.features.implied_column_null: 317 ↛ 319line 317 didn't jump to line 319, because the condition on line 317 was never false

318 yield "NULL" 

319 if field.primary_key: 

320 yield "PRIMARY KEY" 

321 elif field.unique: 

322 yield "UNIQUE" 

323 # Optionally add the tablespace if it's an implicitly indexed column. 

324 tablespace = field.db_tablespace or model._meta.db_tablespace 

325 if ( 325 ↛ 330line 325 didn't jump to line 330

326 tablespace 

327 and self.connection.features.supports_tablespaces 

328 and field.unique 

329 ): 

330 yield self.connection.ops.tablespace_sql(tablespace, inline=True) 

331 

332 def column_sql(self, model, field, include_default=False): 

333 """ 

334 Return the column definition for a field. The field must already have 

335 had set_attributes_from_name() called. 

336 """ 

337 # Get the column's type and use that as the basis of the SQL. 

338 db_params = field.db_parameters(connection=self.connection) 

339 column_db_type = db_params["type"] 

340 # Check for fields that aren't actually columns (e.g. M2M). 

341 if column_db_type is None: 

342 return None, None 

343 params = [] 

344 return ( 

345 " ".join( 

346 # This appends to the params being returned. 

347 self._iter_column_sql( 

348 column_db_type, params, model, field, include_default 

349 ) 

350 ), 

351 params, 

352 ) 

353 

354 def skip_default(self, field): 

355 """ 

356 Some backends don't accept default values for certain columns types 

357 (i.e. MySQL longtext and longblob). 

358 """ 

359 return False 

360 

361 def skip_default_on_alter(self, field): 

362 """ 

363 Some backends don't accept default values for certain columns types 

364 (i.e. MySQL longtext and longblob) in the ALTER COLUMN statement. 

365 """ 

366 return False 

367 

368 def prepare_default(self, value): 

369 """ 

370 Only used for backends which have requires_literal_defaults feature 

371 """ 

372 raise NotImplementedError( 

373 "subclasses of BaseDatabaseSchemaEditor for backends which have " 

374 "requires_literal_defaults must provide a prepare_default() method" 

375 ) 

376 

377 def _column_default_sql(self, field): 

378 """ 

379 Return the SQL to use in a DEFAULT clause. The resulting string should 

380 contain a '%s' placeholder for a default value. 

381 """ 

382 return "%s" 

383 

384 @staticmethod 

385 def _effective_default(field): 

386 # This method allows testing its logic without a connection. 

387 if field.has_default(): 

388 default = field.get_default() 

389 elif not field.null and field.blank and field.empty_strings_allowed: 389 ↛ 390line 389 didn't jump to line 390, because the condition on line 389 was never true

390 if field.get_internal_type() == "BinaryField": 

391 default = b"" 

392 else: 

393 default = "" 

394 elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False): 

395 internal_type = field.get_internal_type() 

396 if internal_type == "DateTimeField": 396 ↛ 397line 396 didn't jump to line 397, because the condition on line 396 was never true

397 default = timezone.now() 

398 else: 

399 default = datetime.now() 

400 if internal_type == "DateField": 400 ↛ 402line 400 didn't jump to line 402, because the condition on line 400 was never false

401 default = default.date() 

402 elif internal_type == "TimeField": 

403 default = default.time() 

404 else: 

405 default = None 

406 return default 

407 

408 def effective_default(self, field): 

409 """Return a field's effective database default value.""" 

410 return field.get_db_prep_save(self._effective_default(field), self.connection) 

411 

412 def quote_value(self, value): 

413 """ 

414 Return a quoted version of the value so it's safe to use in an SQL 

415 string. This is not safe against injection from user code; it is 

416 intended only for use in making SQL scripts or preparing default values 

417 for particularly tricky backends (defaults are not user-defined, though, 

418 so this is safe). 

419 """ 

420 raise NotImplementedError() 

421 

422 # Actions 

423 

424 def create_model(self, model): 

425 """ 

426 Create a table and any accompanying indexes or unique constraints for 

427 the given `model`. 

428 """ 

429 sql, params = self.table_sql(model) 

430 # Prevent using [] as params, in the case a literal '%' is used in the 

431 # definition. 

432 self.execute(sql, params or None) 

433 

434 # Add any field index and index_together's (deferred as SQLite 

435 # _remake_table needs it). 

436 self.deferred_sql.extend(self._model_indexes_sql(model)) 

437 

438 # Make M2M tables 

439 for field in model._meta.local_many_to_many: 

440 if field.remote_field.through._meta.auto_created: 440 ↛ 439line 440 didn't jump to line 439, because the condition on line 440 was never false

441 self.create_model(field.remote_field.through) 

442 

443 def delete_model(self, model): 

444 """Delete a model from the database.""" 

445 # Handle auto-created intermediary models 

446 for field in model._meta.local_many_to_many: 

447 if field.remote_field.through._meta.auto_created: 447 ↛ 446line 447 didn't jump to line 446, because the condition on line 447 was never false

448 self.delete_model(field.remote_field.through) 

449 

450 # Delete the table 

451 self.execute( 

452 self.sql_delete_table 

453 % { 

454 "table": self.quote_name(model._meta.db_table), 

455 } 

456 ) 

457 # Remove all deferred statements referencing the deleted table. 

458 for sql in list(self.deferred_sql): 

459 if isinstance(sql, Statement) and sql.references_table( 459 ↛ 462line 459 didn't jump to line 462, because the condition on line 459 was never true

460 model._meta.db_table 

461 ): 

462 self.deferred_sql.remove(sql) 

463 

464 def add_index(self, model, index): 

465 """Add an index on a model.""" 

466 if ( 

467 index.contains_expressions 

468 and not self.connection.features.supports_expression_indexes 

469 ): 

470 return None 

471 # Index.create_sql returns interpolated SQL which makes params=None a 

472 # necessity to avoid escaping attempts on execution. 

473 self.execute(index.create_sql(model, self), params=None) 

474 

475 def remove_index(self, model, index): 

476 """Remove an index from a model.""" 

477 if ( 

478 index.contains_expressions 

479 and not self.connection.features.supports_expression_indexes 

480 ): 

481 return None 

482 self.execute(index.remove_sql(model, self)) 

483 

484 def add_constraint(self, model, constraint): 

485 """Add a constraint to a model.""" 

486 sql = constraint.create_sql(model, self) 

487 if sql: 

488 # Constraint.create_sql returns interpolated SQL which makes 

489 # params=None a necessity to avoid escaping attempts on execution. 

490 self.execute(sql, params=None) 

491 

492 def remove_constraint(self, model, constraint): 

493 """Remove a constraint from a model.""" 

494 sql = constraint.remove_sql(model, self) 

495 if sql: 

496 self.execute(sql) 

497 

498 def alter_unique_together(self, model, old_unique_together, new_unique_together): 

499 """ 

500 Deal with a model changing its unique_together. The input 

501 unique_togethers must be doubly-nested, not the single-nested 

502 ["foo", "bar"] format. 

503 """ 

504 olds = {tuple(fields) for fields in old_unique_together} 

505 news = {tuple(fields) for fields in new_unique_together} 

506 # Deleted uniques 

507 for fields in olds.difference(news): 

508 self._delete_composed_index( 

509 model, fields, {"unique": True}, self.sql_delete_unique 

510 ) 

511 # Created uniques 

512 for field_names in news.difference(olds): 

513 fields = [model._meta.get_field(field) for field in field_names] 

514 self.execute(self._create_unique_sql(model, fields)) 

515 

516 def alter_index_together(self, model, old_index_together, new_index_together): 

517 """ 

518 Deal with a model changing its index_together. The input 

519 index_togethers must be doubly-nested, not the single-nested 

520 ["foo", "bar"] format. 

521 """ 

522 olds = {tuple(fields) for fields in old_index_together} 

523 news = {tuple(fields) for fields in new_index_together} 

524 # Deleted indexes 

525 for fields in olds.difference(news): 

526 self._delete_composed_index( 

527 model, 

528 fields, 

529 {"index": True, "unique": False}, 

530 self.sql_delete_index, 

531 ) 

532 # Created indexes 

533 for field_names in news.difference(olds): 

534 fields = [model._meta.get_field(field) for field in field_names] 

535 self.execute(self._create_index_sql(model, fields=fields, suffix="_idx")) 

536 

537 def _delete_composed_index(self, model, fields, constraint_kwargs, sql): 

538 meta_constraint_names = { 

539 constraint.name for constraint in model._meta.constraints 

540 } 

541 meta_index_names = {constraint.name for constraint in model._meta.indexes} 

542 columns = [model._meta.get_field(field).column for field in fields] 

543 constraint_names = self._constraint_names( 

544 model, 

545 columns, 

546 exclude=meta_constraint_names | meta_index_names, 

547 **constraint_kwargs, 

548 ) 

549 if len(constraint_names) != 1: 549 ↛ 550line 549 didn't jump to line 550, because the condition on line 549 was never true

550 raise ValueError( 

551 "Found wrong number (%s) of constraints for %s(%s)" 

552 % ( 

553 len(constraint_names), 

554 model._meta.db_table, 

555 ", ".join(columns), 

556 ) 

557 ) 

558 self.execute(self._delete_constraint_sql(sql, model, constraint_names[0])) 

559 

560 def alter_db_table(self, model, old_db_table, new_db_table): 

561 """Rename the table a model points to.""" 

562 if old_db_table == new_db_table or ( 

563 self.connection.features.ignores_table_name_case 

564 and old_db_table.lower() == new_db_table.lower() 

565 ): 

566 return 

567 self.execute( 

568 self.sql_rename_table 

569 % { 

570 "old_table": self.quote_name(old_db_table), 

571 "new_table": self.quote_name(new_db_table), 

572 } 

573 ) 

574 # Rename all references to the old table name. 

575 for sql in self.deferred_sql: 

576 if isinstance(sql, Statement): 

577 sql.rename_table_references(old_db_table, new_db_table) 

578 

579 def alter_db_tablespace(self, model, old_db_tablespace, new_db_tablespace): 

580 """Move a model's table between tablespaces.""" 

581 self.execute( 

582 self.sql_retablespace_table 

583 % { 

584 "table": self.quote_name(model._meta.db_table), 

585 "old_tablespace": self.quote_name(old_db_tablespace), 

586 "new_tablespace": self.quote_name(new_db_tablespace), 

587 } 

588 ) 

589 

590 def add_field(self, model, field): 

591 """ 

592 Create a field on a model. Usually involves adding a column, but may 

593 involve adding a table instead (for M2M fields). 

594 """ 

595 # Special-case implicit M2M tables 

596 if field.many_to_many and field.remote_field.through._meta.auto_created: 

597 return self.create_model(field.remote_field.through) 

598 # Get the column's definition 

599 definition, params = self.column_sql(model, field, include_default=True) 

600 # It might not actually have a column behind it 

601 if definition is None: 

602 return 

603 # Check constraints can go on the column SQL here 

604 db_params = field.db_parameters(connection=self.connection) 

605 if db_params["check"]: 

606 definition += " " + self.sql_check_constraint % db_params 

607 if ( 

608 field.remote_field 

609 and self.connection.features.supports_foreign_keys 

610 and field.db_constraint 

611 ): 

612 constraint_suffix = "_fk_%(to_table)s_%(to_column)s" 

613 # Add FK constraint inline, if supported. 

614 if self.sql_create_column_inline_fk: 614 ↛ 632line 614 didn't jump to line 632, because the condition on line 614 was never false

615 to_table = field.remote_field.model._meta.db_table 

616 to_column = field.remote_field.model._meta.get_field( 

617 field.remote_field.field_name 

618 ).column 

619 namespace, _ = split_identifier(model._meta.db_table) 

620 definition += " " + self.sql_create_column_inline_fk % { 

621 "name": self._fk_constraint_name(model, field, constraint_suffix), 

622 "namespace": "%s." % self.quote_name(namespace) 

623 if namespace 

624 else "", 

625 "column": self.quote_name(field.column), 

626 "to_table": self.quote_name(to_table), 

627 "to_column": self.quote_name(to_column), 

628 "deferrable": self.connection.ops.deferrable_sql(), 

629 } 

630 # Otherwise, add FK constraints later. 

631 else: 

632 self.deferred_sql.append( 

633 self._create_fk_sql(model, field, constraint_suffix) 

634 ) 

635 # Build the SQL and run it 

636 sql = self.sql_create_column % { 

637 "table": self.quote_name(model._meta.db_table), 

638 "column": self.quote_name(field.column), 

639 "definition": definition, 

640 } 

641 self.execute(sql, params) 

642 # Drop the default if we need to 

643 # (Django usually does not use in-database defaults) 

644 if ( 

645 not self.skip_default_on_alter(field) 

646 and self.effective_default(field) is not None 

647 ): 

648 changes_sql, params = self._alter_column_default_sql( 

649 model, None, field, drop=True 

650 ) 

651 sql = self.sql_alter_column % { 

652 "table": self.quote_name(model._meta.db_table), 

653 "changes": changes_sql, 

654 } 

655 self.execute(sql, params) 

656 # Add an index, if required 

657 self.deferred_sql.extend(self._field_indexes_sql(model, field)) 

658 # Reset connection if required 

659 if self.connection.features.connection_persists_old_columns: 659 ↛ 660line 659 didn't jump to line 660, because the condition on line 659 was never true

660 self.connection.close() 

661 

662 def remove_field(self, model, field): 

663 """ 

664 Remove a field from a model. Usually involves deleting a column, 

665 but for M2Ms may involve deleting a table. 

666 """ 

667 # Special-case implicit M2M tables 

668 if field.many_to_many and field.remote_field.through._meta.auto_created: 668 ↛ 669line 668 didn't jump to line 669, because the condition on line 668 was never true

669 return self.delete_model(field.remote_field.through) 

670 # It might not actually have a column behind it 

671 if field.db_parameters(connection=self.connection)["type"] is None: 671 ↛ 672line 671 didn't jump to line 672, because the condition on line 671 was never true

672 return 

673 # Drop any FK constraints, MySQL requires explicit deletion 

674 if field.remote_field: 

675 fk_names = self._constraint_names(model, [field.column], foreign_key=True) 

676 for fk_name in fk_names: 

677 self.execute(self._delete_fk_sql(model, fk_name)) 

678 # Delete the column 

679 sql = self.sql_delete_column % { 

680 "table": self.quote_name(model._meta.db_table), 

681 "column": self.quote_name(field.column), 

682 } 

683 self.execute(sql) 

684 # Reset connection if required 

685 if self.connection.features.connection_persists_old_columns: 685 ↛ 686line 685 didn't jump to line 686, because the condition on line 685 was never true

686 self.connection.close() 

687 # Remove all deferred statements referencing the deleted column. 

688 for sql in list(self.deferred_sql): 

689 if isinstance(sql, Statement) and sql.references_column( 689 ↛ 692line 689 didn't jump to line 692, because the condition on line 689 was never true

690 model._meta.db_table, field.column 

691 ): 

692 self.deferred_sql.remove(sql) 

693 

694 def alter_field(self, model, old_field, new_field, strict=False): 

695 """ 

696 Allow a field's type, uniqueness, nullability, default, column, 

697 constraints, etc. to be modified. 

698 `old_field` is required to compute the necessary changes. 

699 If `strict` is True, raise errors if the old column does not match 

700 `old_field` precisely. 

701 """ 

702 if not self._field_should_be_altered(old_field, new_field): 

703 return 

704 # Ensure this field is even column-based 

705 old_db_params = old_field.db_parameters(connection=self.connection) 

706 old_type = old_db_params["type"] 

707 new_db_params = new_field.db_parameters(connection=self.connection) 

708 new_type = new_db_params["type"] 

709 if (old_type is None and old_field.remote_field is None) or ( 709 ↛ 712line 709 didn't jump to line 712, because the condition on line 709 was never true

710 new_type is None and new_field.remote_field is None 

711 ): 

712 raise ValueError( 

713 "Cannot alter field %s into %s - they do not properly define " 

714 "db_type (are you using a badly-written custom field?)" 

715 % (old_field, new_field), 

716 ) 

717 elif ( 717 ↛ 727line 717 didn't jump to line 727

718 old_type is None 

719 and new_type is None 

720 and ( 

721 old_field.remote_field.through 

722 and new_field.remote_field.through 

723 and old_field.remote_field.through._meta.auto_created 

724 and new_field.remote_field.through._meta.auto_created 

725 ) 

726 ): 

727 return self._alter_many_to_many(model, old_field, new_field, strict) 

728 elif ( 728 ↛ 739line 728 didn't jump to line 739

729 old_type is None 

730 and new_type is None 

731 and ( 

732 old_field.remote_field.through 

733 and new_field.remote_field.through 

734 and not old_field.remote_field.through._meta.auto_created 

735 and not new_field.remote_field.through._meta.auto_created 

736 ) 

737 ): 

738 # Both sides have through models; this is a no-op. 

739 return 

740 elif old_type is None or new_type is None: 740 ↛ 741line 740 didn't jump to line 741, because the condition on line 740 was never true

741 raise ValueError( 

742 "Cannot alter field %s into %s - they are not compatible types " 

743 "(you cannot alter to or from M2M fields, or add or remove " 

744 "through= on M2M fields)" % (old_field, new_field) 

745 ) 

746 

747 self._alter_field( 

748 model, 

749 old_field, 

750 new_field, 

751 old_type, 

752 new_type, 

753 old_db_params, 

754 new_db_params, 

755 strict, 

756 ) 

757 

758 def _alter_field( 

759 self, 

760 model, 

761 old_field, 

762 new_field, 

763 old_type, 

764 new_type, 

765 old_db_params, 

766 new_db_params, 

767 strict=False, 

768 ): 

769 """Perform a "physical" (non-ManyToMany) field update.""" 

770 # Drop any FK constraints, we'll remake them later 

771 fks_dropped = set() 

772 if ( 

773 self.connection.features.supports_foreign_keys 

774 and old_field.remote_field 

775 and old_field.db_constraint 

776 ): 

777 fk_names = self._constraint_names( 

778 model, [old_field.column], foreign_key=True 

779 ) 

780 if strict and len(fk_names) != 1: 780 ↛ 781line 780 didn't jump to line 781, because the condition on line 780 was never true

781 raise ValueError( 

782 "Found wrong number (%s) of foreign key constraints for %s.%s" 

783 % ( 

784 len(fk_names), 

785 model._meta.db_table, 

786 old_field.column, 

787 ) 

788 ) 

789 for fk_name in fk_names: 

790 fks_dropped.add((old_field.column,)) 

791 self.execute(self._delete_fk_sql(model, fk_name)) 

792 # Has unique been removed? 

793 if old_field.unique and ( 

794 not new_field.unique or self._field_became_primary_key(old_field, new_field) 

795 ): 

796 # Find the unique constraint for this field 

797 meta_constraint_names = { 

798 constraint.name for constraint in model._meta.constraints 

799 } 

800 constraint_names = self._constraint_names( 

801 model, 

802 [old_field.column], 

803 unique=True, 

804 primary_key=False, 

805 exclude=meta_constraint_names, 

806 ) 

807 if strict and len(constraint_names) != 1: 807 ↛ 808line 807 didn't jump to line 808, because the condition on line 807 was never true

808 raise ValueError( 

809 "Found wrong number (%s) of unique constraints for %s.%s" 

810 % ( 

811 len(constraint_names), 

812 model._meta.db_table, 

813 old_field.column, 

814 ) 

815 ) 

816 for constraint_name in constraint_names: 

817 self.execute(self._delete_unique_sql(model, constraint_name)) 

818 # Drop incoming FK constraints if the field is a primary key or unique, 

819 # which might be a to_field target, and things are going to change. 

820 drop_foreign_keys = ( 

821 self.connection.features.supports_foreign_keys 

822 and ( 

823 (old_field.primary_key and new_field.primary_key) 

824 or (old_field.unique and new_field.unique) 

825 ) 

826 and old_type != new_type 

827 ) 

828 if drop_foreign_keys: 

829 # '_meta.related_field' also contains M2M reverse fields, these 

830 # will be filtered out 

831 for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field): 831 ↛ 832line 831 didn't jump to line 832, because the loop on line 831 never started

832 rel_fk_names = self._constraint_names( 

833 new_rel.related_model, [new_rel.field.column], foreign_key=True 

834 ) 

835 for fk_name in rel_fk_names: 

836 self.execute(self._delete_fk_sql(new_rel.related_model, fk_name)) 

837 # Removed an index? (no strict check, as multiple indexes are possible) 

838 # Remove indexes if db_index switched to False or a unique constraint 

839 # will now be used in lieu of an index. The following lines from the 

840 # truth table show all True cases; the rest are False: 

841 # 

842 # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique 

843 # ------------------------------------------------------------------------------ 

844 # True | False | False | False 

845 # True | False | False | True 

846 # True | False | True | True 

847 if ( 

848 old_field.db_index 

849 and not old_field.unique 

850 and (not new_field.db_index or new_field.unique) 

851 ): 

852 # Find the index for this field 

853 meta_index_names = {index.name for index in model._meta.indexes} 

854 # Retrieve only BTREE indexes since this is what's created with 

855 # db_index=True. 

856 index_names = self._constraint_names( 

857 model, 

858 [old_field.column], 

859 index=True, 

860 type_=Index.suffix, 

861 exclude=meta_index_names, 

862 ) 

863 for index_name in index_names: 

864 # The only way to check if an index was created with 

865 # db_index=True or with Index(['field'], name='foo') 

866 # is to look at its name (refs #28053). 

867 self.execute(self._delete_index_sql(model, index_name)) 

868 # Change check constraints? 

869 if old_db_params["check"] != new_db_params["check"] and old_db_params["check"]: 869 ↛ 870line 869 didn't jump to line 870, because the condition on line 869 was never true

870 meta_constraint_names = { 

871 constraint.name for constraint in model._meta.constraints 

872 } 

873 constraint_names = self._constraint_names( 

874 model, 

875 [old_field.column], 

876 check=True, 

877 exclude=meta_constraint_names, 

878 ) 

879 if strict and len(constraint_names) != 1: 

880 raise ValueError( 

881 "Found wrong number (%s) of check constraints for %s.%s" 

882 % ( 

883 len(constraint_names), 

884 model._meta.db_table, 

885 old_field.column, 

886 ) 

887 ) 

888 for constraint_name in constraint_names: 

889 self.execute(self._delete_check_sql(model, constraint_name)) 

890 # Have they renamed the column? 

891 if old_field.column != new_field.column: 

892 self.execute( 

893 self._rename_field_sql( 

894 model._meta.db_table, old_field, new_field, new_type 

895 ) 

896 ) 

897 # Rename all references to the renamed column. 

898 for sql in self.deferred_sql: 898 ↛ 899line 898 didn't jump to line 899, because the loop on line 898 never started

899 if isinstance(sql, Statement): 

900 sql.rename_column_references( 

901 model._meta.db_table, old_field.column, new_field.column 

902 ) 

903 # Next, start accumulating actions to do 

904 actions = [] 

905 null_actions = [] 

906 post_actions = [] 

907 # Collation change? 

908 old_collation = getattr(old_field, "db_collation", None) 

909 new_collation = getattr(new_field, "db_collation", None) 

910 if old_collation != new_collation: 910 ↛ 912line 910 didn't jump to line 912, because the condition on line 910 was never true

911 # Collation change handles also a type change. 

912 fragment = self._alter_column_collation_sql( 

913 model, new_field, new_type, new_collation 

914 ) 

915 actions.append(fragment) 

916 # Type change? 

917 elif old_type != new_type: 

918 fragment, other_actions = self._alter_column_type_sql( 

919 model, old_field, new_field, new_type 

920 ) 

921 actions.append(fragment) 

922 post_actions.extend(other_actions) 

923 # When changing a column NULL constraint to NOT NULL with a given 

924 # default value, we need to perform 4 steps: 

925 # 1. Add a default for new incoming writes 

926 # 2. Update existing NULL rows with new default 

927 # 3. Replace NULL constraint with NOT NULL 

928 # 4. Drop the default again. 

929 # Default change? 

930 needs_database_default = False 

931 if old_field.null and not new_field.null: 

932 old_default = self.effective_default(old_field) 

933 new_default = self.effective_default(new_field) 

934 if ( 934 ↛ 939line 934 didn't jump to line 939

935 not self.skip_default_on_alter(new_field) 

936 and old_default != new_default 

937 and new_default is not None 

938 ): 

939 needs_database_default = True 

940 actions.append( 

941 self._alter_column_default_sql(model, old_field, new_field) 

942 ) 

943 # Nullability change? 

944 if old_field.null != new_field.null: 

945 fragment = self._alter_column_null_sql(model, old_field, new_field) 

946 if fragment: 946 ↛ 949line 946 didn't jump to line 949, because the condition on line 946 was never false

947 null_actions.append(fragment) 

948 # Only if we have a default and there is a change from NULL to NOT NULL 

949 four_way_default_alteration = new_field.has_default() and ( 

950 old_field.null and not new_field.null 

951 ) 

952 if actions or null_actions: 

953 if not four_way_default_alteration: 953 ↛ 958line 953 didn't jump to line 958, because the condition on line 953 was never false

954 # If we don't have to do a 4-way default alteration we can 

955 # directly run a (NOT) NULL alteration 

956 actions = actions + null_actions 

957 # Combine actions together if we can (e.g. postgres) 

958 if self.connection.features.supports_combined_alters and actions: 958 ↛ 962line 958 didn't jump to line 962, because the condition on line 958 was never false

959 sql, params = tuple(zip(*actions)) 

960 actions = [(", ".join(sql), sum(params, []))] 

961 # Apply those actions 

962 for sql, params in actions: 

963 self.execute( 

964 self.sql_alter_column 

965 % { 

966 "table": self.quote_name(model._meta.db_table), 

967 "changes": sql, 

968 }, 

969 params, 

970 ) 

971 if four_way_default_alteration: 971 ↛ 973line 971 didn't jump to line 973, because the condition on line 971 was never true

972 # Update existing rows with default value 

973 self.execute( 

974 self.sql_update_with_default 

975 % { 

976 "table": self.quote_name(model._meta.db_table), 

977 "column": self.quote_name(new_field.column), 

978 "default": "%s", 

979 }, 

980 [new_default], 

981 ) 

982 # Since we didn't run a NOT NULL change before we need to do it 

983 # now 

984 for sql, params in null_actions: 

985 self.execute( 

986 self.sql_alter_column 

987 % { 

988 "table": self.quote_name(model._meta.db_table), 

989 "changes": sql, 

990 }, 

991 params, 

992 ) 

993 if post_actions: 993 ↛ 994line 993 didn't jump to line 994, because the condition on line 993 was never true

994 for sql, params in post_actions: 

995 self.execute(sql, params) 

996 # If primary_key changed to False, delete the primary key constraint. 

997 if old_field.primary_key and not new_field.primary_key: 997 ↛ 998line 997 didn't jump to line 998, because the condition on line 997 was never true

998 self._delete_primary_key(model, strict) 

999 # Added a unique? 

1000 if self._unique_should_be_added(old_field, new_field): 

1001 self.execute(self._create_unique_sql(model, [new_field])) 

1002 # Added an index? Add an index if db_index switched to True or a unique 

1003 # constraint will no longer be used in lieu of an index. The following 

1004 # lines from the truth table show all True cases; the rest are False: 

1005 # 

1006 # old_field.db_index | old_field.unique | new_field.db_index | new_field.unique 

1007 # ------------------------------------------------------------------------------ 

1008 # False | False | True | False 

1009 # False | True | True | False 

1010 # True | True | True | False 

1011 if ( 

1012 (not old_field.db_index or old_field.unique) 

1013 and new_field.db_index 

1014 and not new_field.unique 

1015 ): 

1016 self.execute(self._create_index_sql(model, fields=[new_field])) 

1017 # Type alteration on primary key? Then we need to alter the column 

1018 # referring to us. 

1019 rels_to_update = [] 

1020 if drop_foreign_keys: 

1021 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field)) 

1022 # Changed to become primary key? 

1023 if self._field_became_primary_key(old_field, new_field): 1023 ↛ 1025line 1023 didn't jump to line 1025, because the condition on line 1023 was never true

1024 # Make the new one 

1025 self.execute(self._create_primary_key_sql(model, new_field)) 

1026 # Update all referencing columns 

1027 rels_to_update.extend(_related_non_m2m_objects(old_field, new_field)) 

1028 # Handle our type alters on the other end of rels from the PK stuff above 

1029 for old_rel, new_rel in rels_to_update: 1029 ↛ 1030line 1029 didn't jump to line 1030, because the loop on line 1029 never started

1030 rel_db_params = new_rel.field.db_parameters(connection=self.connection) 

1031 rel_type = rel_db_params["type"] 

1032 fragment, other_actions = self._alter_column_type_sql( 

1033 new_rel.related_model, old_rel.field, new_rel.field, rel_type 

1034 ) 

1035 self.execute( 

1036 self.sql_alter_column 

1037 % { 

1038 "table": self.quote_name(new_rel.related_model._meta.db_table), 

1039 "changes": fragment[0], 

1040 }, 

1041 fragment[1], 

1042 ) 

1043 for sql, params in other_actions: 

1044 self.execute(sql, params) 

1045 # Does it have a foreign key? 

1046 if ( 

1047 self.connection.features.supports_foreign_keys 

1048 and new_field.remote_field 

1049 and ( 

1050 fks_dropped or not old_field.remote_field or not old_field.db_constraint 

1051 ) 

1052 and new_field.db_constraint 

1053 ): 

1054 self.execute( 

1055 self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s") 

1056 ) 

1057 # Rebuild FKs that pointed to us if we previously had to drop them 

1058 if drop_foreign_keys: 

1059 for _, rel in rels_to_update: 1059 ↛ 1060line 1059 didn't jump to line 1060, because the loop on line 1059 never started

1060 if rel.field.db_constraint: 

1061 self.execute( 

1062 self._create_fk_sql(rel.related_model, rel.field, "_fk") 

1063 ) 

1064 # Does it have check constraints we need to add? 

1065 if old_db_params["check"] != new_db_params["check"] and new_db_params["check"]: 1065 ↛ 1066line 1065 didn't jump to line 1066, because the condition on line 1065 was never true

1066 constraint_name = self._create_index_name( 

1067 model._meta.db_table, [new_field.column], suffix="_check" 

1068 ) 

1069 self.execute( 

1070 self._create_check_sql(model, constraint_name, new_db_params["check"]) 

1071 ) 

1072 # Drop the default if we need to 

1073 # (Django usually does not use in-database defaults) 

1074 if needs_database_default: 1074 ↛ 1075line 1074 didn't jump to line 1075, because the condition on line 1074 was never true

1075 changes_sql, params = self._alter_column_default_sql( 

1076 model, old_field, new_field, drop=True 

1077 ) 

1078 sql = self.sql_alter_column % { 

1079 "table": self.quote_name(model._meta.db_table), 

1080 "changes": changes_sql, 

1081 } 

1082 self.execute(sql, params) 

1083 # Reset connection if required 

1084 if self.connection.features.connection_persists_old_columns: 1084 ↛ 1085line 1084 didn't jump to line 1085, because the condition on line 1084 was never true

1085 self.connection.close() 

1086 

1087 def _alter_column_null_sql(self, model, old_field, new_field): 

1088 """ 

1089 Hook to specialize column null alteration. 

1090 

1091 Return a (sql, params) fragment to set a column to null or non-null 

1092 as required by new_field, or None if no changes are required. 

1093 """ 

1094 if ( 1094 ↛ 1099line 1094 didn't jump to line 1099

1095 self.connection.features.interprets_empty_strings_as_nulls 

1096 and new_field.empty_strings_allowed 

1097 ): 

1098 # The field is nullable in the database anyway, leave it alone. 

1099 return 

1100 else: 

1101 new_db_params = new_field.db_parameters(connection=self.connection) 

1102 sql = ( 

1103 self.sql_alter_column_null 

1104 if new_field.null 

1105 else self.sql_alter_column_not_null 

1106 ) 

1107 return ( 

1108 sql 

1109 % { 

1110 "column": self.quote_name(new_field.column), 

1111 "type": new_db_params["type"], 

1112 }, 

1113 [], 

1114 ) 

1115 

1116 def _alter_column_default_sql(self, model, old_field, new_field, drop=False): 

1117 """ 

1118 Hook to specialize column default alteration. 

1119 

1120 Return a (sql, params) fragment to add or drop (depending on the drop 

1121 argument) a default to new_field's column. 

1122 """ 

1123 new_default = self.effective_default(new_field) 

1124 default = self._column_default_sql(new_field) 

1125 params = [new_default] 

1126 

1127 if drop: 1127 ↛ 1129line 1127 didn't jump to line 1129, because the condition on line 1127 was never false

1128 params = [] 

1129 elif self.connection.features.requires_literal_defaults: 

1130 # Some databases (Oracle) can't take defaults as a parameter 

1131 # If this is the case, the SchemaEditor for that database should 

1132 # implement prepare_default(). 

1133 default = self.prepare_default(new_default) 

1134 params = [] 

1135 

1136 new_db_params = new_field.db_parameters(connection=self.connection) 

1137 if drop: 1137 ↛ 1143line 1137 didn't jump to line 1143, because the condition on line 1137 was never false

1138 if new_field.null: 

1139 sql = self.sql_alter_column_no_default_null 

1140 else: 

1141 sql = self.sql_alter_column_no_default 

1142 else: 

1143 sql = self.sql_alter_column_default 

1144 return ( 

1145 sql 

1146 % { 

1147 "column": self.quote_name(new_field.column), 

1148 "type": new_db_params["type"], 

1149 "default": default, 

1150 }, 

1151 params, 

1152 ) 

1153 

1154 def _alter_column_type_sql(self, model, old_field, new_field, new_type): 

1155 """ 

1156 Hook to specialize column type alteration for different backends, 

1157 for cases when a creation type is different to an alteration type 

1158 (e.g. SERIAL in PostgreSQL, PostGIS fields). 

1159 

1160 Return a two-tuple of: an SQL fragment of (sql, params) to insert into 

1161 an ALTER TABLE statement and a list of extra (sql, params) tuples to 

1162 run once the field is altered. 

1163 """ 

1164 return ( 

1165 ( 

1166 self.sql_alter_column_type 

1167 % { 

1168 "column": self.quote_name(new_field.column), 

1169 "type": new_type, 

1170 }, 

1171 [], 

1172 ), 

1173 [], 

1174 ) 

1175 

1176 def _alter_column_collation_sql(self, model, new_field, new_type, new_collation): 

1177 return ( 

1178 self.sql_alter_column_collate 

1179 % { 

1180 "column": self.quote_name(new_field.column), 

1181 "type": new_type, 

1182 "collation": " " + self._collate_sql(new_collation) 

1183 if new_collation 

1184 else "", 

1185 }, 

1186 [], 

1187 ) 

1188 

1189 def _alter_many_to_many(self, model, old_field, new_field, strict): 

1190 """Alter M2Ms to repoint their to= endpoints.""" 

1191 # Rename the through table 

1192 if ( 

1193 old_field.remote_field.through._meta.db_table 

1194 != new_field.remote_field.through._meta.db_table 

1195 ): 

1196 self.alter_db_table( 

1197 old_field.remote_field.through, 

1198 old_field.remote_field.through._meta.db_table, 

1199 new_field.remote_field.through._meta.db_table, 

1200 ) 

1201 # Repoint the FK to the other side 

1202 self.alter_field( 

1203 new_field.remote_field.through, 

1204 # The field that points to the target model is needed, so we can 

1205 # tell alter_field to change it - this is m2m_reverse_field_name() 

1206 # (as opposed to m2m_field_name(), which points to our model). 

1207 old_field.remote_field.through._meta.get_field( 

1208 old_field.m2m_reverse_field_name() 

1209 ), 

1210 new_field.remote_field.through._meta.get_field( 

1211 new_field.m2m_reverse_field_name() 

1212 ), 

1213 ) 

1214 self.alter_field( 

1215 new_field.remote_field.through, 

1216 # for self-referential models we need to alter field from the other end too 

1217 old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()), 

1218 new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()), 

1219 ) 

1220 

1221 def _create_index_name(self, table_name, column_names, suffix=""): 

1222 """ 

1223 Generate a unique name for an index/unique constraint. 

1224 

1225 The name is divided into 3 parts: the table name, the column names, 

1226 and a unique digest and suffix. 

1227 """ 

1228 _, table_name = split_identifier(table_name) 

1229 hash_suffix_part = "%s%s" % ( 

1230 names_digest(table_name, *column_names, length=8), 

1231 suffix, 

1232 ) 

1233 max_length = self.connection.ops.max_name_length() or 200 

1234 # If everything fits into max_length, use that name. 

1235 index_name = "%s_%s_%s" % (table_name, "_".join(column_names), hash_suffix_part) 

1236 if len(index_name) <= max_length: 

1237 return index_name 

1238 # Shorten a long suffix. 

1239 if len(hash_suffix_part) > max_length / 3: 

1240 hash_suffix_part = hash_suffix_part[: max_length // 3] 

1241 other_length = (max_length - len(hash_suffix_part)) // 2 - 1 

1242 index_name = "%s_%s_%s" % ( 

1243 table_name[:other_length], 

1244 "_".join(column_names)[:other_length], 

1245 hash_suffix_part, 

1246 ) 

1247 # Prepend D if needed to prevent the name from starting with an 

1248 # underscore or a number (not permitted on Oracle). 

1249 if index_name[0] == "_" or index_name[0].isdigit(): 1249 ↛ 1250line 1249 didn't jump to line 1250, because the condition on line 1249 was never true

1250 index_name = "D%s" % index_name[:-1] 

1251 return index_name 

1252 

1253 def _get_index_tablespace_sql(self, model, fields, db_tablespace=None): 

1254 if db_tablespace is None: 1254 ↛ 1259line 1254 didn't jump to line 1259, because the condition on line 1254 was never false

1255 if len(fields) == 1 and fields[0].db_tablespace: 1255 ↛ 1256line 1255 didn't jump to line 1256, because the condition on line 1255 was never true

1256 db_tablespace = fields[0].db_tablespace 

1257 elif model._meta.db_tablespace: 1257 ↛ 1258line 1257 didn't jump to line 1258, because the condition on line 1257 was never true

1258 db_tablespace = model._meta.db_tablespace 

1259 if db_tablespace is not None: 1259 ↛ 1260line 1259 didn't jump to line 1260, because the condition on line 1259 was never true

1260 return " " + self.connection.ops.tablespace_sql(db_tablespace) 

1261 return "" 

1262 

1263 def _index_condition_sql(self, condition): 

1264 if condition: 1264 ↛ 1265line 1264 didn't jump to line 1265, because the condition on line 1264 was never true

1265 return " WHERE " + condition 

1266 return "" 

1267 

1268 def _index_include_sql(self, model, columns): 

1269 if not columns or not self.connection.features.supports_covering_indexes: 1269 ↛ 1271line 1269 didn't jump to line 1271, because the condition on line 1269 was never false

1270 return "" 

1271 return Statement( 

1272 " INCLUDE (%(columns)s)", 

1273 columns=Columns(model._meta.db_table, columns, self.quote_name), 

1274 ) 

1275 

1276 def _create_index_sql( 

1277 self, 

1278 model, 

1279 *, 

1280 fields=None, 

1281 name=None, 

1282 suffix="", 

1283 using="", 

1284 db_tablespace=None, 

1285 col_suffixes=(), 

1286 sql=None, 

1287 opclasses=(), 

1288 condition=None, 

1289 include=None, 

1290 expressions=None, 

1291 ): 

1292 """ 

1293 Return the SQL statement to create the index for one or several fields 

1294 or expressions. `sql` can be specified if the syntax differs from the 

1295 standard (GIS indexes, ...). 

1296 """ 

1297 fields = fields or [] 

1298 expressions = expressions or [] 

1299 compiler = Query(model, alias_cols=False).get_compiler( 

1300 connection=self.connection, 

1301 ) 

1302 tablespace_sql = self._get_index_tablespace_sql( 

1303 model, fields, db_tablespace=db_tablespace 

1304 ) 

1305 columns = [field.column for field in fields] 

1306 sql_create_index = sql or self.sql_create_index 

1307 table = model._meta.db_table 

1308 

1309 def create_index_name(*args, **kwargs): 

1310 nonlocal name 

1311 if name is None: 

1312 name = self._create_index_name(*args, **kwargs) 

1313 return self.quote_name(name) 

1314 

1315 return Statement( 

1316 sql_create_index, 

1317 table=Table(table, self.quote_name), 

1318 name=IndexName(table, columns, suffix, create_index_name), 

1319 using=using, 

1320 columns=( 

1321 self._index_columns(table, columns, col_suffixes, opclasses) 

1322 if columns 

1323 else Expressions(table, expressions, compiler, self.quote_value) 

1324 ), 

1325 extra=tablespace_sql, 

1326 condition=self._index_condition_sql(condition), 

1327 include=self._index_include_sql(model, include), 

1328 ) 

1329 

1330 def _delete_index_sql(self, model, name, sql=None): 

1331 return Statement( 

1332 sql or self.sql_delete_index, 

1333 table=Table(model._meta.db_table, self.quote_name), 

1334 name=self.quote_name(name), 

1335 ) 

1336 

1337 def _index_columns(self, table, columns, col_suffixes, opclasses): 

1338 return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes) 

1339 

1340 def _model_indexes_sql(self, model): 

1341 """ 

1342 Return a list of all index SQL statements (field indexes, 

1343 index_together, Meta.indexes) for the specified model. 

1344 """ 

1345 if not model._meta.managed or model._meta.proxy or model._meta.swapped: 1345 ↛ 1346line 1345 didn't jump to line 1346, because the condition on line 1345 was never true

1346 return [] 

1347 output = [] 

1348 for field in model._meta.local_fields: 

1349 output.extend(self._field_indexes_sql(model, field)) 

1350 

1351 for field_names in model._meta.index_together: 1351 ↛ 1352line 1351 didn't jump to line 1352, because the loop on line 1351 never started

1352 fields = [model._meta.get_field(field) for field in field_names] 

1353 output.append(self._create_index_sql(model, fields=fields, suffix="_idx")) 

1354 

1355 for index in model._meta.indexes: 1355 ↛ 1356line 1355 didn't jump to line 1356, because the loop on line 1355 never started

1356 if ( 

1357 not index.contains_expressions 

1358 or self.connection.features.supports_expression_indexes 

1359 ): 

1360 output.append(index.create_sql(model, self)) 

1361 return output 

1362 

1363 def _field_indexes_sql(self, model, field): 

1364 """ 

1365 Return a list of all index SQL statements for the specified field. 

1366 """ 

1367 output = [] 

1368 if self._field_should_be_indexed(model, field): 

1369 output.append(self._create_index_sql(model, fields=[field])) 

1370 return output 

1371 

1372 def _field_should_be_altered(self, old_field, new_field): 

1373 _, old_path, old_args, old_kwargs = old_field.deconstruct() 

1374 _, new_path, new_args, new_kwargs = new_field.deconstruct() 

1375 # Don't alter when: 

1376 # - changing only a field name 

1377 # - changing an attribute that doesn't affect the schema 

1378 # - adding only a db_column and the column name is not changed 

1379 non_database_attrs = [ 

1380 "blank", 

1381 "db_column", 

1382 "editable", 

1383 "error_messages", 

1384 "help_text", 

1385 "limit_choices_to", 

1386 # Database-level options are not supported, see #21961. 

1387 "on_delete", 

1388 "related_name", 

1389 "related_query_name", 

1390 "validators", 

1391 "verbose_name", 

1392 ] 

1393 for attr in non_database_attrs: 

1394 old_kwargs.pop(attr, None) 

1395 new_kwargs.pop(attr, None) 

1396 return self.quote_name(old_field.column) != self.quote_name( 

1397 new_field.column 

1398 ) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs) 

1399 

1400 def _field_should_be_indexed(self, model, field): 

1401 return field.db_index and not field.unique 

1402 

1403 def _field_became_primary_key(self, old_field, new_field): 

1404 return not old_field.primary_key and new_field.primary_key 

1405 

1406 def _unique_should_be_added(self, old_field, new_field): 

1407 return (not old_field.unique and new_field.unique) or ( 

1408 old_field.primary_key and not new_field.primary_key and new_field.unique 

1409 ) 

1410 

1411 def _rename_field_sql(self, table, old_field, new_field, new_type): 

1412 return self.sql_rename_column % { 

1413 "table": self.quote_name(table), 

1414 "old_column": self.quote_name(old_field.column), 

1415 "new_column": self.quote_name(new_field.column), 

1416 "type": new_type, 

1417 } 

1418 

1419 def _create_fk_sql(self, model, field, suffix): 

1420 table = Table(model._meta.db_table, self.quote_name) 

1421 name = self._fk_constraint_name(model, field, suffix) 

1422 column = Columns(model._meta.db_table, [field.column], self.quote_name) 

1423 to_table = Table(field.target_field.model._meta.db_table, self.quote_name) 

1424 to_column = Columns( 

1425 field.target_field.model._meta.db_table, 

1426 [field.target_field.column], 

1427 self.quote_name, 

1428 ) 

1429 deferrable = self.connection.ops.deferrable_sql() 

1430 return Statement( 

1431 self.sql_create_fk, 

1432 table=table, 

1433 name=name, 

1434 column=column, 

1435 to_table=to_table, 

1436 to_column=to_column, 

1437 deferrable=deferrable, 

1438 ) 

1439 

1440 def _fk_constraint_name(self, model, field, suffix): 

1441 def create_fk_name(*args, **kwargs): 

1442 return self.quote_name(self._create_index_name(*args, **kwargs)) 

1443 

1444 return ForeignKeyName( 

1445 model._meta.db_table, 

1446 [field.column], 

1447 split_identifier(field.target_field.model._meta.db_table)[1], 

1448 [field.target_field.column], 

1449 suffix, 

1450 create_fk_name, 

1451 ) 

1452 

1453 def _delete_fk_sql(self, model, name): 

1454 return self._delete_constraint_sql(self.sql_delete_fk, model, name) 

1455 

1456 def _deferrable_constraint_sql(self, deferrable): 

1457 if deferrable is None: 1457 ↛ 1459line 1457 didn't jump to line 1459, because the condition on line 1457 was never false

1458 return "" 

1459 if deferrable == Deferrable.DEFERRED: 

1460 return " DEFERRABLE INITIALLY DEFERRED" 

1461 if deferrable == Deferrable.IMMEDIATE: 

1462 return " DEFERRABLE INITIALLY IMMEDIATE" 

1463 

1464 def _unique_sql( 

1465 self, 

1466 model, 

1467 fields, 

1468 name, 

1469 condition=None, 

1470 deferrable=None, 

1471 include=None, 

1472 opclasses=None, 

1473 expressions=None, 

1474 ): 

1475 if ( 

1476 deferrable 

1477 and not self.connection.features.supports_deferrable_unique_constraints 

1478 ): 

1479 return None 

1480 if condition or include or opclasses or expressions: 

1481 # Databases support conditional, covering, and functional unique 

1482 # constraints via a unique index. 

1483 sql = self._create_unique_sql( 

1484 model, 

1485 fields, 

1486 name=name, 

1487 condition=condition, 

1488 include=include, 

1489 opclasses=opclasses, 

1490 expressions=expressions, 

1491 ) 

1492 if sql: 

1493 self.deferred_sql.append(sql) 

1494 return None 

1495 constraint = self.sql_unique_constraint % { 

1496 "columns": ", ".join([self.quote_name(field.column) for field in fields]), 

1497 "deferrable": self._deferrable_constraint_sql(deferrable), 

1498 } 

1499 return self.sql_constraint % { 

1500 "name": self.quote_name(name), 

1501 "constraint": constraint, 

1502 } 

1503 

1504 def _create_unique_sql( 

1505 self, 

1506 model, 

1507 fields, 

1508 name=None, 

1509 condition=None, 

1510 deferrable=None, 

1511 include=None, 

1512 opclasses=None, 

1513 expressions=None, 

1514 ): 

1515 if ( 1515 ↛ 1526line 1515 didn't jump to line 1526

1516 ( 

1517 deferrable 

1518 and not self.connection.features.supports_deferrable_unique_constraints 

1519 ) 

1520 or (condition and not self.connection.features.supports_partial_indexes) 

1521 or (include and not self.connection.features.supports_covering_indexes) 

1522 or ( 

1523 expressions and not self.connection.features.supports_expression_indexes 

1524 ) 

1525 ): 

1526 return None 

1527 

1528 def create_unique_name(*args, **kwargs): 

1529 return self.quote_name(self._create_index_name(*args, **kwargs)) 

1530 

1531 compiler = Query(model, alias_cols=False).get_compiler( 

1532 connection=self.connection 

1533 ) 

1534 table = model._meta.db_table 

1535 columns = [field.column for field in fields] 

1536 if name is None: 1536 ↛ 1539line 1536 didn't jump to line 1539, because the condition on line 1536 was never false

1537 name = IndexName(table, columns, "_uniq", create_unique_name) 

1538 else: 

1539 name = self.quote_name(name) 

1540 if condition or include or opclasses or expressions: 1540 ↛ 1541line 1540 didn't jump to line 1541, because the condition on line 1540 was never true

1541 sql = self.sql_create_unique_index 

1542 else: 

1543 sql = self.sql_create_unique 

1544 if columns: 1544 ↛ 1549line 1544 didn't jump to line 1549, because the condition on line 1544 was never false

1545 columns = self._index_columns( 

1546 table, columns, col_suffixes=(), opclasses=opclasses 

1547 ) 

1548 else: 

1549 columns = Expressions(table, expressions, compiler, self.quote_value) 

1550 return Statement( 

1551 sql, 

1552 table=Table(table, self.quote_name), 

1553 name=name, 

1554 columns=columns, 

1555 condition=self._index_condition_sql(condition), 

1556 deferrable=self._deferrable_constraint_sql(deferrable), 

1557 include=self._index_include_sql(model, include), 

1558 ) 

1559 

1560 def _delete_unique_sql( 

1561 self, 

1562 model, 

1563 name, 

1564 condition=None, 

1565 deferrable=None, 

1566 include=None, 

1567 opclasses=None, 

1568 expressions=None, 

1569 ): 

1570 if ( 1570 ↛ 1581line 1570 didn't jump to line 1581

1571 ( 

1572 deferrable 

1573 and not self.connection.features.supports_deferrable_unique_constraints 

1574 ) 

1575 or (condition and not self.connection.features.supports_partial_indexes) 

1576 or (include and not self.connection.features.supports_covering_indexes) 

1577 or ( 

1578 expressions and not self.connection.features.supports_expression_indexes 

1579 ) 

1580 ): 

1581 return None 

1582 if condition or include or opclasses or expressions: 1582 ↛ 1583line 1582 didn't jump to line 1583, because the condition on line 1582 was never true

1583 sql = self.sql_delete_index 

1584 else: 

1585 sql = self.sql_delete_unique 

1586 return self._delete_constraint_sql(sql, model, name) 

1587 

1588 def _check_sql(self, name, check): 

1589 return self.sql_constraint % { 

1590 "name": self.quote_name(name), 

1591 "constraint": self.sql_check_constraint % {"check": check}, 

1592 } 

1593 

1594 def _create_check_sql(self, model, name, check): 

1595 return Statement( 

1596 self.sql_create_check, 

1597 table=Table(model._meta.db_table, self.quote_name), 

1598 name=self.quote_name(name), 

1599 check=check, 

1600 ) 

1601 

1602 def _delete_check_sql(self, model, name): 

1603 return self._delete_constraint_sql(self.sql_delete_check, model, name) 

1604 

1605 def _delete_constraint_sql(self, template, model, name): 

1606 return Statement( 

1607 template, 

1608 table=Table(model._meta.db_table, self.quote_name), 

1609 name=self.quote_name(name), 

1610 ) 

1611 

1612 def _constraint_names( 

1613 self, 

1614 model, 

1615 column_names=None, 

1616 unique=None, 

1617 primary_key=None, 

1618 index=None, 

1619 foreign_key=None, 

1620 check=None, 

1621 type_=None, 

1622 exclude=None, 

1623 ): 

1624 """Return all constraint names matching the columns and conditions.""" 

1625 if column_names is not None: 1625 ↛ 1630line 1625 didn't jump to line 1630, because the condition on line 1625 was never false

1626 column_names = [ 

1627 self.connection.introspection.identifier_converter(name) 

1628 for name in column_names 

1629 ] 

1630 with self.connection.cursor() as cursor: 

1631 constraints = self.connection.introspection.get_constraints( 

1632 cursor, model._meta.db_table 

1633 ) 

1634 result = [] 

1635 for name, infodict in constraints.items(): 

1636 if column_names is None or column_names == infodict["columns"]: 

1637 if unique is not None and infodict["unique"] != unique: 

1638 continue 

1639 if primary_key is not None and infodict["primary_key"] != primary_key: 1639 ↛ 1640line 1639 didn't jump to line 1640, because the condition on line 1639 was never true

1640 continue 

1641 if index is not None and infodict["index"] != index: 1641 ↛ 1642line 1641 didn't jump to line 1642, because the condition on line 1641 was never true

1642 continue 

1643 if check is not None and infodict["check"] != check: 1643 ↛ 1644line 1643 didn't jump to line 1644, because the condition on line 1643 was never true

1644 continue 

1645 if foreign_key is not None and not infodict["foreign_key"]: 

1646 continue 

1647 if type_ is not None and infodict["type"] != type_: 1647 ↛ 1648line 1647 didn't jump to line 1648, because the condition on line 1647 was never true

1648 continue 

1649 if not exclude or name not in exclude: 1649 ↛ 1635line 1649 didn't jump to line 1635, because the condition on line 1649 was never false

1650 result.append(name) 

1651 return result 

1652 

1653 def _delete_primary_key(self, model, strict=False): 

1654 constraint_names = self._constraint_names(model, primary_key=True) 

1655 if strict and len(constraint_names) != 1: 

1656 raise ValueError( 

1657 "Found wrong number (%s) of PK constraints for %s" 

1658 % ( 

1659 len(constraint_names), 

1660 model._meta.db_table, 

1661 ) 

1662 ) 

1663 for constraint_name in constraint_names: 

1664 self.execute(self._delete_primary_key_sql(model, constraint_name)) 

1665 

1666 def _create_primary_key_sql(self, model, field): 

1667 return Statement( 

1668 self.sql_create_pk, 

1669 table=Table(model._meta.db_table, self.quote_name), 

1670 name=self.quote_name( 

1671 self._create_index_name( 

1672 model._meta.db_table, [field.column], suffix="_pk" 

1673 ) 

1674 ), 

1675 columns=Columns(model._meta.db_table, [field.column], self.quote_name), 

1676 ) 

1677 

1678 def _delete_primary_key_sql(self, model, name): 

1679 return self._delete_constraint_sql(self.sql_delete_pk, model, name) 

1680 

1681 def _collate_sql(self, collation): 

1682 return "COLLATE " + self.quote_name(collation) 

1683 

1684 def remove_procedure(self, procedure_name, param_types=()): 

1685 sql = self.sql_delete_procedure % { 

1686 "procedure": self.quote_name(procedure_name), 

1687 "param_types": ",".join(param_types), 

1688 } 

1689 self.execute(sql)