Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/indexes.py: 51%

140 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from django.db.backends.utils import names_digest, split_identifier 

2from django.db.models.expressions import Col, ExpressionList, F, Func, OrderBy 

3from django.db.models.functions import Collate 

4from django.db.models.query_utils import Q 

5from django.db.models.sql import Query 

6from django.utils.functional import partition 

7 

8__all__ = ["Index"] 

9 

10 

11class Index: 

12 suffix = "idx" 

13 # The max length of the name of the index (restricted to 30 for 

14 # cross-database compatibility with Oracle) 

15 max_name_length = 30 

16 

17 def __init__( 

18 self, 

19 *expressions, 

20 fields=(), 

21 name=None, 

22 db_tablespace=None, 

23 opclasses=(), 

24 condition=None, 

25 include=None, 

26 ): 

27 if opclasses and not name: 27 ↛ 28line 27 didn't jump to line 28, because the condition on line 27 was never true

28 raise ValueError("An index must be named to use opclasses.") 

29 if not isinstance(condition, (type(None), Q)): 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true

30 raise ValueError("Index.condition must be a Q instance.") 

31 if condition and not name: 31 ↛ 32line 31 didn't jump to line 32, because the condition on line 31 was never true

32 raise ValueError("An index must be named to use condition.") 

33 if not isinstance(fields, (list, tuple)): 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true

34 raise ValueError("Index.fields must be a list or tuple.") 

35 if not isinstance(opclasses, (list, tuple)): 35 ↛ 36line 35 didn't jump to line 36, because the condition on line 35 was never true

36 raise ValueError("Index.opclasses must be a list or tuple.") 

37 if not expressions and not fields: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 raise ValueError( 

39 "At least one field or expression is required to define an index." 

40 ) 

41 if expressions and fields: 41 ↛ 42line 41 didn't jump to line 42, because the condition on line 41 was never true

42 raise ValueError( 

43 "Index.fields and expressions are mutually exclusive.", 

44 ) 

45 if expressions and not name: 45 ↛ 46line 45 didn't jump to line 46, because the condition on line 45 was never true

46 raise ValueError("An index must be named to use expressions.") 

47 if expressions and opclasses: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true

48 raise ValueError( 

49 "Index.opclasses cannot be used with expressions. Use " 

50 "django.contrib.postgres.indexes.OpClass() instead." 

51 ) 

52 if opclasses and len(fields) != len(opclasses): 52 ↛ 53line 52 didn't jump to line 53, because the condition on line 52 was never true

53 raise ValueError( 

54 "Index.fields and Index.opclasses must have the same number of " 

55 "elements." 

56 ) 

57 if fields and not all(isinstance(field, str) for field in fields): 57 ↛ 58line 57 didn't jump to line 58, because the condition on line 57 was never true

58 raise ValueError("Index.fields must contain only strings with field names.") 

59 if include and not name: 59 ↛ 60line 59 didn't jump to line 60, because the condition on line 59 was never true

60 raise ValueError("A covering index must be named.") 

61 if not isinstance(include, (type(None), list, tuple)): 61 ↛ 62line 61 didn't jump to line 62, because the condition on line 61 was never true

62 raise ValueError("Index.include must be a list or tuple.") 

63 self.fields = list(fields) 

64 # A list of 2-tuple with the field name and ordering ('' or 'DESC'). 

65 self.fields_orders = [ 

66 (field_name[1:], "DESC") if field_name.startswith("-") else (field_name, "") 

67 for field_name in self.fields 

68 ] 

69 self.name = name or "" 

70 self.db_tablespace = db_tablespace 

71 self.opclasses = opclasses 

72 self.condition = condition 

73 self.include = tuple(include) if include else () 

74 self.expressions = tuple( 

75 F(expression) if isinstance(expression, str) else expression 

76 for expression in expressions 

77 ) 

78 

79 @property 

80 def contains_expressions(self): 

81 return bool(self.expressions) 

82 

83 def _get_condition_sql(self, model, schema_editor): 

84 if self.condition is None: 84 ↛ 86line 84 didn't jump to line 86, because the condition on line 84 was never false

85 return None 

86 query = Query(model=model, alias_cols=False) 

87 where = query.build_where(self.condition) 

88 compiler = query.get_compiler(connection=schema_editor.connection) 

89 sql, params = where.as_sql(compiler, schema_editor.connection) 

90 return sql % tuple(schema_editor.quote_value(p) for p in params) 

91 

92 def create_sql(self, model, schema_editor, using="", **kwargs): 

93 include = [ 

94 model._meta.get_field(field_name).column for field_name in self.include 

95 ] 

96 condition = self._get_condition_sql(model, schema_editor) 

97 if self.expressions: 97 ↛ 98line 97 didn't jump to line 98, because the condition on line 97 was never true

98 index_expressions = [] 

99 for expression in self.expressions: 

100 index_expression = IndexExpression(expression) 

101 index_expression.set_wrapper_classes(schema_editor.connection) 

102 index_expressions.append(index_expression) 

103 expressions = ExpressionList(*index_expressions).resolve_expression( 

104 Query(model, alias_cols=False), 

105 ) 

106 fields = None 

107 col_suffixes = None 

108 else: 

109 fields = [ 

110 model._meta.get_field(field_name) 

111 for field_name, _ in self.fields_orders 

112 ] 

113 col_suffixes = [order[1] for order in self.fields_orders] 

114 expressions = None 

115 return schema_editor._create_index_sql( 

116 model, 

117 fields=fields, 

118 name=self.name, 

119 using=using, 

120 db_tablespace=self.db_tablespace, 

121 col_suffixes=col_suffixes, 

122 opclasses=self.opclasses, 

123 condition=condition, 

124 include=include, 

125 expressions=expressions, 

126 **kwargs, 

127 ) 

128 

129 def remove_sql(self, model, schema_editor, **kwargs): 

130 return schema_editor._delete_index_sql(model, self.name, **kwargs) 

131 

132 def deconstruct(self): 

133 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__) 

134 path = path.replace("django.db.models.indexes", "django.db.models") 

135 kwargs = {"name": self.name} 

136 if self.fields: 136 ↛ 138line 136 didn't jump to line 138, because the condition on line 136 was never false

137 kwargs["fields"] = self.fields 

138 if self.db_tablespace is not None: 138 ↛ 139line 138 didn't jump to line 139, because the condition on line 138 was never true

139 kwargs["db_tablespace"] = self.db_tablespace 

140 if self.opclasses: 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true

141 kwargs["opclasses"] = self.opclasses 

142 if self.condition: 142 ↛ 143line 142 didn't jump to line 143, because the condition on line 142 was never true

143 kwargs["condition"] = self.condition 

144 if self.include: 144 ↛ 145line 144 didn't jump to line 145, because the condition on line 144 was never true

145 kwargs["include"] = self.include 

146 return (path, self.expressions, kwargs) 

147 

148 def clone(self): 

149 """Create a copy of this Index.""" 

150 _, args, kwargs = self.deconstruct() 

151 return self.__class__(*args, **kwargs) 

152 

153 def set_name_with_model(self, model): 

154 """ 

155 Generate a unique name for the index. 

156 

157 The name is divided into 3 parts - table name (12 chars), field name 

158 (8 chars) and unique hash + suffix (10 chars). Each part is made to 

159 fit its size by truncating the excess length. 

160 """ 

161 _, table_name = split_identifier(model._meta.db_table) 

162 column_names = [ 

163 model._meta.get_field(field_name).column 

164 for field_name, order in self.fields_orders 

165 ] 

166 column_names_with_order = [ 

167 (("-%s" if order else "%s") % column_name) 

168 for column_name, (field_name, order) in zip( 

169 column_names, self.fields_orders 

170 ) 

171 ] 

172 # The length of the parts of the name is based on the default max 

173 # length of 30 characters. 

174 hash_data = [table_name] + column_names_with_order + [self.suffix] 

175 self.name = "%s_%s_%s" % ( 

176 table_name[:11], 

177 column_names[0][:7], 

178 "%s_%s" % (names_digest(*hash_data, length=6), self.suffix), 

179 ) 

180 if len(self.name) > self.max_name_length: 180 ↛ 181line 180 didn't jump to line 181, because the condition on line 180 was never true

181 raise ValueError( 

182 "Index too long for multiple database support. Is self.suffix " 

183 "longer than 3 characters?" 

184 ) 

185 if self.name[0] == "_" or self.name[0].isdigit(): 185 ↛ 186line 185 didn't jump to line 186, because the condition on line 185 was never true

186 self.name = "D%s" % self.name[1:] 

187 

188 def __repr__(self): 

189 return "<%s:%s%s%s%s%s%s%s>" % ( 

190 self.__class__.__qualname__, 

191 "" if not self.fields else " fields=%s" % repr(self.fields), 

192 "" if not self.expressions else " expressions=%s" % repr(self.expressions), 

193 "" if not self.name else " name=%s" % repr(self.name), 

194 "" 

195 if self.db_tablespace is None 

196 else " db_tablespace=%s" % repr(self.db_tablespace), 

197 "" if self.condition is None else " condition=%s" % self.condition, 

198 "" if not self.include else " include=%s" % repr(self.include), 

199 "" if not self.opclasses else " opclasses=%s" % repr(self.opclasses), 

200 ) 

201 

202 def __eq__(self, other): 

203 if self.__class__ == other.__class__: 

204 return self.deconstruct() == other.deconstruct() 

205 return NotImplemented 

206 

207 

208class IndexExpression(Func): 

209 """Order and wrap expressions for CREATE INDEX statements.""" 

210 

211 template = "%(expressions)s" 

212 wrapper_classes = (OrderBy, Collate) 

213 

214 def set_wrapper_classes(self, connection=None): 

215 # Some databases (e.g. MySQL) treats COLLATE as an indexed expression. 

216 if connection and connection.features.collate_as_index_expression: 

217 self.wrapper_classes = tuple( 

218 [ 

219 wrapper_cls 

220 for wrapper_cls in self.wrapper_classes 

221 if wrapper_cls is not Collate 

222 ] 

223 ) 

224 

225 @classmethod 

226 def register_wrappers(cls, *wrapper_classes): 

227 cls.wrapper_classes = wrapper_classes 

228 

229 def resolve_expression( 

230 self, 

231 query=None, 

232 allow_joins=True, 

233 reuse=None, 

234 summarize=False, 

235 for_save=False, 

236 ): 

237 expressions = list(self.flatten()) 

238 # Split expressions and wrappers. 

239 index_expressions, wrappers = partition( 

240 lambda e: isinstance(e, self.wrapper_classes), 

241 expressions, 

242 ) 

243 wrapper_types = [type(wrapper) for wrapper in wrappers] 

244 if len(wrapper_types) != len(set(wrapper_types)): 

245 raise ValueError( 

246 "Multiple references to %s can't be used in an indexed " 

247 "expression." 

248 % ", ".join( 

249 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes] 

250 ) 

251 ) 

252 if expressions[1 : len(wrappers) + 1] != wrappers: 

253 raise ValueError( 

254 "%s must be topmost expressions in an indexed expression." 

255 % ", ".join( 

256 [wrapper_cls.__qualname__ for wrapper_cls in self.wrapper_classes] 

257 ) 

258 ) 

259 # Wrap expressions in parentheses if they are not column references. 

260 root_expression = index_expressions[1] 

261 resolve_root_expression = root_expression.resolve_expression( 

262 query, 

263 allow_joins, 

264 reuse, 

265 summarize, 

266 for_save, 

267 ) 

268 if not isinstance(resolve_root_expression, Col): 

269 root_expression = Func(root_expression, template="(%(expressions)s)") 

270 

271 if wrappers: 

272 # Order wrappers and set their expressions. 

273 wrappers = sorted( 

274 wrappers, 

275 key=lambda w: self.wrapper_classes.index(type(w)), 

276 ) 

277 wrappers = [wrapper.copy() for wrapper in wrappers] 

278 for i, wrapper in enumerate(wrappers[:-1]): 

279 wrapper.set_source_expressions([wrappers[i + 1]]) 

280 # Set the root expression on the deepest wrapper. 

281 wrappers[-1].set_source_expressions([root_expression]) 

282 self.set_source_expressions([wrappers[0]]) 

283 else: 

284 # Use the root expression, if there are no wrappers. 

285 self.set_source_expressions([root_expression]) 

286 return super().resolve_expression( 

287 query, allow_joins, reuse, summarize, for_save 

288 ) 

289 

290 def as_sqlite(self, compiler, connection, **extra_context): 

291 # Casting to numeric is unnecessary. 

292 return self.as_sql(compiler, connection, **extra_context)