Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/schema.py: 69%

95 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import psycopg2 

2 

3from django.db.backends.base.schema import BaseDatabaseSchemaEditor 

4from django.db.backends.ddl_references import IndexColumns 

5from django.db.backends.utils import strip_quotes 

6 

7 

8class DatabaseSchemaEditor(BaseDatabaseSchemaEditor): 

9 

10 sql_create_sequence = "CREATE SEQUENCE %(sequence)s" 

11 sql_delete_sequence = "DROP SEQUENCE IF EXISTS %(sequence)s CASCADE" 

12 sql_set_sequence_max = ( 

13 "SELECT setval('%(sequence)s', MAX(%(column)s)) FROM %(table)s" 

14 ) 

15 sql_set_sequence_owner = "ALTER SEQUENCE %(sequence)s OWNED BY %(table)s.%(column)s" 

16 

17 sql_create_index = ( 

18 "CREATE INDEX %(name)s ON %(table)s%(using)s " 

19 "(%(columns)s)%(include)s%(extra)s%(condition)s" 

20 ) 

21 sql_create_index_concurrently = ( 

22 "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s%(using)s " 

23 "(%(columns)s)%(include)s%(extra)s%(condition)s" 

24 ) 

25 sql_delete_index = "DROP INDEX IF EXISTS %(name)s" 

26 sql_delete_index_concurrently = "DROP INDEX CONCURRENTLY IF EXISTS %(name)s" 

27 

28 # Setting the constraint to IMMEDIATE to allow changing data in the same 

29 # transaction. 

30 sql_create_column_inline_fk = ( 

31 "CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s" 

32 "; SET CONSTRAINTS %(namespace)s%(name)s IMMEDIATE" 

33 ) 

34 # Setting the constraint to IMMEDIATE runs any deferred checks to allow 

35 # dropping it in the same transaction. 

36 sql_delete_fk = ( 

37 "SET CONSTRAINTS %(name)s IMMEDIATE; " 

38 "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s" 

39 ) 

40 sql_delete_procedure = "DROP FUNCTION %(procedure)s(%(param_types)s)" 

41 

42 def quote_value(self, value): 

43 if isinstance(value, str): 

44 value = value.replace("%", "%%") 

45 adapted = psycopg2.extensions.adapt(value) 

46 if hasattr(adapted, "encoding"): 

47 adapted.encoding = "utf8" 

48 # getquoted() returns a quoted bytestring of the adapted value. 

49 return adapted.getquoted().decode() 

50 

51 def _field_indexes_sql(self, model, field): 

52 output = super()._field_indexes_sql(model, field) 

53 like_index_statement = self._create_like_index_sql(model, field) 

54 if like_index_statement is not None: 

55 output.append(like_index_statement) 

56 return output 

57 

58 def _field_data_type(self, field): 

59 if field.is_relation: 59 ↛ 60line 59 didn't jump to line 60, because the condition on line 59 was never true

60 return field.rel_db_type(self.connection) 

61 return self.connection.data_types.get( 

62 field.get_internal_type(), 

63 field.db_type(self.connection), 

64 ) 

65 

66 def _field_base_data_types(self, field): 

67 # Yield base data types for array fields. 

68 if field.base_field.get_internal_type() == "ArrayField": 

69 yield from self._field_base_data_types(field.base_field) 

70 else: 

71 yield self._field_data_type(field.base_field) 

72 

73 def _create_like_index_sql(self, model, field): 

74 """ 

75 Return the statement to create an index with varchar operator pattern 

76 when the column type is 'varchar' or 'text', otherwise return None. 

77 """ 

78 db_type = field.db_type(connection=self.connection) 

79 if db_type is not None and (field.db_index or field.unique): 

80 # Fields with database column types of `varchar` and `text` need 

81 # a second index that specifies their operator class, which is 

82 # needed when performing correct LIKE queries outside the 

83 # C locale. See #12234. 

84 # 

85 # The same doesn't apply to array fields such as varchar[size] 

86 # and text[size], so skip them. 

87 if "[" in db_type: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true

88 return None 

89 if db_type.startswith("varchar"): 

90 return self._create_index_sql( 

91 model, 

92 fields=[field], 

93 suffix="_like", 

94 opclasses=["varchar_pattern_ops"], 

95 ) 

96 elif db_type.startswith("text"): 96 ↛ 97line 96 didn't jump to line 97, because the condition on line 96 was never true

97 return self._create_index_sql( 

98 model, 

99 fields=[field], 

100 suffix="_like", 

101 opclasses=["text_pattern_ops"], 

102 ) 

103 return None 

104 

105 def _alter_column_type_sql(self, model, old_field, new_field, new_type): 

106 self.sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s" 

107 # Cast when data type changed. 

108 using_sql = " USING %(column)s::%(type)s" 

109 new_internal_type = new_field.get_internal_type() 

110 old_internal_type = old_field.get_internal_type() 

111 if new_internal_type == "ArrayField" and new_internal_type == old_internal_type: 111 ↛ 113line 111 didn't jump to line 113, because the condition on line 111 was never true

112 # Compare base data types for array fields. 

113 if list(self._field_base_data_types(old_field)) != list( 

114 self._field_base_data_types(new_field) 

115 ): 

116 self.sql_alter_column_type += using_sql 

117 elif self._field_data_type(old_field) != self._field_data_type(new_field): 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true

118 self.sql_alter_column_type += using_sql 

119 # Make ALTER TYPE with SERIAL make sense. 

120 table = strip_quotes(model._meta.db_table) 

121 serial_fields_map = { 

122 "bigserial": "bigint", 

123 "serial": "integer", 

124 "smallserial": "smallint", 

125 } 

126 if new_type.lower() in serial_fields_map: 126 ↛ 127line 126 didn't jump to line 127, because the condition on line 126 was never true

127 column = strip_quotes(new_field.column) 

128 sequence_name = "%s_%s_seq" % (table, column) 

129 return ( 

130 ( 

131 self.sql_alter_column_type 

132 % { 

133 "column": self.quote_name(column), 

134 "type": serial_fields_map[new_type.lower()], 

135 }, 

136 [], 

137 ), 

138 [ 

139 ( 

140 self.sql_delete_sequence 

141 % { 

142 "sequence": self.quote_name(sequence_name), 

143 }, 

144 [], 

145 ), 

146 ( 

147 self.sql_create_sequence 

148 % { 

149 "sequence": self.quote_name(sequence_name), 

150 }, 

151 [], 

152 ), 

153 ( 

154 self.sql_alter_column 

155 % { 

156 "table": self.quote_name(table), 

157 "changes": self.sql_alter_column_default 

158 % { 

159 "column": self.quote_name(column), 

160 "default": "nextval('%s')" 

161 % self.quote_name(sequence_name), 

162 }, 

163 }, 

164 [], 

165 ), 

166 ( 

167 self.sql_set_sequence_max 

168 % { 

169 "table": self.quote_name(table), 

170 "column": self.quote_name(column), 

171 "sequence": self.quote_name(sequence_name), 

172 }, 

173 [], 

174 ), 

175 ( 

176 self.sql_set_sequence_owner 

177 % { 

178 "table": self.quote_name(table), 

179 "column": self.quote_name(column), 

180 "sequence": self.quote_name(sequence_name), 

181 }, 

182 [], 

183 ), 

184 ], 

185 ) 

186 elif ( 186 ↛ 191line 186 didn't jump to line 191

187 old_field.db_parameters(connection=self.connection)["type"] 

188 in serial_fields_map 

189 ): 

190 # Drop the sequence if migrating away from AutoField. 

191 column = strip_quotes(new_field.column) 

192 sequence_name = "%s_%s_seq" % (table, column) 

193 fragment, _ = super()._alter_column_type_sql( 

194 model, old_field, new_field, new_type 

195 ) 

196 return fragment, [ 

197 ( 

198 self.sql_delete_sequence 

199 % { 

200 "sequence": self.quote_name(sequence_name), 

201 }, 

202 [], 

203 ), 

204 ] 

205 else: 

206 return super()._alter_column_type_sql(model, old_field, new_field, new_type) 

207 

208 def _alter_field( 

209 self, 

210 model, 

211 old_field, 

212 new_field, 

213 old_type, 

214 new_type, 

215 old_db_params, 

216 new_db_params, 

217 strict=False, 

218 ): 

219 # Drop indexes on varchar/text/citext columns that are changing to a 

220 # different type. 

221 if (old_field.db_index or old_field.unique) and ( 221 ↛ 226line 221 didn't jump to line 226, because the condition on line 221 was never true

222 (old_type.startswith("varchar") and not new_type.startswith("varchar")) 

223 or (old_type.startswith("text") and not new_type.startswith("text")) 

224 or (old_type.startswith("citext") and not new_type.startswith("citext")) 

225 ): 

226 index_name = self._create_index_name( 

227 model._meta.db_table, [old_field.column], suffix="_like" 

228 ) 

229 self.execute(self._delete_index_sql(model, index_name)) 

230 

231 super()._alter_field( 

232 model, 

233 old_field, 

234 new_field, 

235 old_type, 

236 new_type, 

237 old_db_params, 

238 new_db_params, 

239 strict, 

240 ) 

241 # Added an index? Create any PostgreSQL-specific indexes. 

242 if (not (old_field.db_index or old_field.unique) and new_field.db_index) or ( 

243 not old_field.unique and new_field.unique 

244 ): 

245 like_index_statement = self._create_like_index_sql(model, new_field) 

246 if like_index_statement is not None: 246 ↛ 250line 246 didn't jump to line 250, because the condition on line 246 was never false

247 self.execute(like_index_statement) 

248 

249 # Removed an index? Drop any PostgreSQL-specific indexes. 

250 if old_field.unique and not (new_field.db_index or new_field.unique): 

251 index_to_remove = self._create_index_name( 

252 model._meta.db_table, [old_field.column], suffix="_like" 

253 ) 

254 self.execute(self._delete_index_sql(model, index_to_remove)) 

255 

256 def _index_columns(self, table, columns, col_suffixes, opclasses): 

257 if opclasses: 

258 return IndexColumns( 

259 table, 

260 columns, 

261 self.quote_name, 

262 col_suffixes=col_suffixes, 

263 opclasses=opclasses, 

264 ) 

265 return super()._index_columns(table, columns, col_suffixes, opclasses) 

266 

267 def add_index(self, model, index, concurrently=False): 

268 self.execute( 

269 index.create_sql(model, self, concurrently=concurrently), params=None 

270 ) 

271 

272 def remove_index(self, model, index, concurrently=False): 

273 self.execute(index.remove_sql(model, self, concurrently=concurrently)) 

274 

275 def _delete_index_sql(self, model, name, sql=None, concurrently=False): 

276 sql = ( 

277 self.sql_delete_index_concurrently 

278 if concurrently 

279 else self.sql_delete_index 

280 ) 

281 return super()._delete_index_sql(model, name, sql) 

282 

283 def _create_index_sql( 

284 self, 

285 model, 

286 *, 

287 fields=None, 

288 name=None, 

289 suffix="", 

290 using="", 

291 db_tablespace=None, 

292 col_suffixes=(), 

293 sql=None, 

294 opclasses=(), 

295 condition=None, 

296 concurrently=False, 

297 include=None, 

298 expressions=None, 

299 ): 

300 sql = ( 

301 self.sql_create_index 

302 if not concurrently 

303 else self.sql_create_index_concurrently 

304 ) 

305 return super()._create_index_sql( 

306 model, 

307 fields=fields, 

308 name=name, 

309 suffix=suffix, 

310 using=using, 

311 db_tablespace=db_tablespace, 

312 col_suffixes=col_suffixes, 

313 sql=sql, 

314 opclasses=opclasses, 

315 condition=condition, 

316 include=include, 

317 expressions=expressions, 

318 )