Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/constraints.py: 22%

151 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from enum import Enum 

2 

3from django.db.models.expressions import ExpressionList, F 

4from django.db.models.indexes import IndexExpression 

5from django.db.models.query_utils import Q 

6from django.db.models.sql.query import Query 

7 

8__all__ = ["CheckConstraint", "Deferrable", "UniqueConstraint"] 

9 

10 

11class BaseConstraint: 

12 def __init__(self, name): 

13 self.name = name 

14 

15 @property 

16 def contains_expressions(self): 

17 return False 

18 

19 def constraint_sql(self, model, schema_editor): 

20 raise NotImplementedError("This method must be implemented by a subclass.") 

21 

22 def create_sql(self, model, schema_editor): 

23 raise NotImplementedError("This method must be implemented by a subclass.") 

24 

25 def remove_sql(self, model, schema_editor): 

26 raise NotImplementedError("This method must be implemented by a subclass.") 

27 

28 def deconstruct(self): 

29 path = "%s.%s" % (self.__class__.__module__, self.__class__.__name__) 

30 path = path.replace("django.db.models.constraints", "django.db.models") 

31 return (path, (), {"name": self.name}) 

32 

33 def clone(self): 

34 _, args, kwargs = self.deconstruct() 

35 return self.__class__(*args, **kwargs) 

36 

37 

38class CheckConstraint(BaseConstraint): 

39 def __init__(self, *, check, name): 

40 self.check = check 

41 if not getattr(check, "conditional", False): 

42 raise TypeError( 

43 "CheckConstraint.check must be a Q instance or boolean expression." 

44 ) 

45 super().__init__(name) 

46 

47 def _get_check_sql(self, model, schema_editor): 

48 query = Query(model=model, alias_cols=False) 

49 where = query.build_where(self.check) 

50 compiler = query.get_compiler(connection=schema_editor.connection) 

51 sql, params = where.as_sql(compiler, schema_editor.connection) 

52 return sql % tuple(schema_editor.quote_value(p) for p in params) 

53 

54 def constraint_sql(self, model, schema_editor): 

55 check = self._get_check_sql(model, schema_editor) 

56 return schema_editor._check_sql(self.name, check) 

57 

58 def create_sql(self, model, schema_editor): 

59 check = self._get_check_sql(model, schema_editor) 

60 return schema_editor._create_check_sql(model, self.name, check) 

61 

62 def remove_sql(self, model, schema_editor): 

63 return schema_editor._delete_check_sql(model, self.name) 

64 

65 def __repr__(self): 

66 return "<%s: check=%s name=%s>" % ( 

67 self.__class__.__qualname__, 

68 self.check, 

69 repr(self.name), 

70 ) 

71 

72 def __eq__(self, other): 

73 if isinstance(other, CheckConstraint): 

74 return self.name == other.name and self.check == other.check 

75 return super().__eq__(other) 

76 

77 def deconstruct(self): 

78 path, args, kwargs = super().deconstruct() 

79 kwargs["check"] = self.check 

80 return path, args, kwargs 

81 

82 

83class Deferrable(Enum): 

84 DEFERRED = "deferred" 

85 IMMEDIATE = "immediate" 

86 

87 # A similar format was proposed for Python 3.10. 

88 def __repr__(self): 

89 return f"{self.__class__.__qualname__}.{self._name_}" 

90 

91 

92class UniqueConstraint(BaseConstraint): 

93 def __init__( 

94 self, 

95 *expressions, 

96 fields=(), 

97 name=None, 

98 condition=None, 

99 deferrable=None, 

100 include=None, 

101 opclasses=(), 

102 ): 

103 if not name: 

104 raise ValueError("A unique constraint must be named.") 

105 if not expressions and not fields: 

106 raise ValueError( 

107 "At least one field or expression is required to define a " 

108 "unique constraint." 

109 ) 

110 if expressions and fields: 

111 raise ValueError( 

112 "UniqueConstraint.fields and expressions are mutually exclusive." 

113 ) 

114 if not isinstance(condition, (type(None), Q)): 

115 raise ValueError("UniqueConstraint.condition must be a Q instance.") 

116 if condition and deferrable: 

117 raise ValueError("UniqueConstraint with conditions cannot be deferred.") 

118 if include and deferrable: 

119 raise ValueError("UniqueConstraint with include fields cannot be deferred.") 

120 if opclasses and deferrable: 

121 raise ValueError("UniqueConstraint with opclasses cannot be deferred.") 

122 if expressions and deferrable: 

123 raise ValueError("UniqueConstraint with expressions cannot be deferred.") 

124 if expressions and opclasses: 

125 raise ValueError( 

126 "UniqueConstraint.opclasses cannot be used with expressions. " 

127 "Use django.contrib.postgres.indexes.OpClass() instead." 

128 ) 

129 if not isinstance(deferrable, (type(None), Deferrable)): 

130 raise ValueError( 

131 "UniqueConstraint.deferrable must be a Deferrable instance." 

132 ) 

133 if not isinstance(include, (type(None), list, tuple)): 

134 raise ValueError("UniqueConstraint.include must be a list or tuple.") 

135 if not isinstance(opclasses, (list, tuple)): 

136 raise ValueError("UniqueConstraint.opclasses must be a list or tuple.") 

137 if opclasses and len(fields) != len(opclasses): 

138 raise ValueError( 

139 "UniqueConstraint.fields and UniqueConstraint.opclasses must " 

140 "have the same number of elements." 

141 ) 

142 self.fields = tuple(fields) 

143 self.condition = condition 

144 self.deferrable = deferrable 

145 self.include = tuple(include) if include else () 

146 self.opclasses = opclasses 

147 self.expressions = tuple( 

148 F(expression) if isinstance(expression, str) else expression 

149 for expression in expressions 

150 ) 

151 super().__init__(name) 

152 

153 @property 

154 def contains_expressions(self): 

155 return bool(self.expressions) 

156 

157 def _get_condition_sql(self, model, schema_editor): 

158 if self.condition is None: 

159 return None 

160 query = Query(model=model, alias_cols=False) 

161 where = query.build_where(self.condition) 

162 compiler = query.get_compiler(connection=schema_editor.connection) 

163 sql, params = where.as_sql(compiler, schema_editor.connection) 

164 return sql % tuple(schema_editor.quote_value(p) for p in params) 

165 

166 def _get_index_expressions(self, model, schema_editor): 

167 if not self.expressions: 

168 return None 

169 index_expressions = [] 

170 for expression in self.expressions: 

171 index_expression = IndexExpression(expression) 

172 index_expression.set_wrapper_classes(schema_editor.connection) 

173 index_expressions.append(index_expression) 

174 return ExpressionList(*index_expressions).resolve_expression( 

175 Query(model, alias_cols=False), 

176 ) 

177 

178 def constraint_sql(self, model, schema_editor): 

179 fields = [model._meta.get_field(field_name) for field_name in self.fields] 

180 include = [ 

181 model._meta.get_field(field_name).column for field_name in self.include 

182 ] 

183 condition = self._get_condition_sql(model, schema_editor) 

184 expressions = self._get_index_expressions(model, schema_editor) 

185 return schema_editor._unique_sql( 

186 model, 

187 fields, 

188 self.name, 

189 condition=condition, 

190 deferrable=self.deferrable, 

191 include=include, 

192 opclasses=self.opclasses, 

193 expressions=expressions, 

194 ) 

195 

196 def create_sql(self, model, schema_editor): 

197 fields = [model._meta.get_field(field_name) for field_name in self.fields] 

198 include = [ 

199 model._meta.get_field(field_name).column for field_name in self.include 

200 ] 

201 condition = self._get_condition_sql(model, schema_editor) 

202 expressions = self._get_index_expressions(model, schema_editor) 

203 return schema_editor._create_unique_sql( 

204 model, 

205 fields, 

206 self.name, 

207 condition=condition, 

208 deferrable=self.deferrable, 

209 include=include, 

210 opclasses=self.opclasses, 

211 expressions=expressions, 

212 ) 

213 

214 def remove_sql(self, model, schema_editor): 

215 condition = self._get_condition_sql(model, schema_editor) 

216 include = [ 

217 model._meta.get_field(field_name).column for field_name in self.include 

218 ] 

219 expressions = self._get_index_expressions(model, schema_editor) 

220 return schema_editor._delete_unique_sql( 

221 model, 

222 self.name, 

223 condition=condition, 

224 deferrable=self.deferrable, 

225 include=include, 

226 opclasses=self.opclasses, 

227 expressions=expressions, 

228 ) 

229 

230 def __repr__(self): 

231 return "<%s:%s%s%s%s%s%s%s>" % ( 

232 self.__class__.__qualname__, 

233 "" if not self.fields else " fields=%s" % repr(self.fields), 

234 "" if not self.expressions else " expressions=%s" % repr(self.expressions), 

235 " name=%s" % repr(self.name), 

236 "" if self.condition is None else " condition=%s" % self.condition, 

237 "" if self.deferrable is None else " deferrable=%r" % self.deferrable, 

238 "" if not self.include else " include=%s" % repr(self.include), 

239 "" if not self.opclasses else " opclasses=%s" % repr(self.opclasses), 

240 ) 

241 

242 def __eq__(self, other): 

243 if isinstance(other, UniqueConstraint): 

244 return ( 

245 self.name == other.name 

246 and self.fields == other.fields 

247 and self.condition == other.condition 

248 and self.deferrable == other.deferrable 

249 and self.include == other.include 

250 and self.opclasses == other.opclasses 

251 and self.expressions == other.expressions 

252 ) 

253 return super().__eq__(other) 

254 

255 def deconstruct(self): 

256 path, args, kwargs = super().deconstruct() 

257 if self.fields: 

258 kwargs["fields"] = self.fields 

259 if self.condition: 

260 kwargs["condition"] = self.condition 

261 if self.deferrable: 

262 kwargs["deferrable"] = self.deferrable 

263 if self.include: 

264 kwargs["include"] = self.include 

265 if self.opclasses: 

266 kwargs["opclasses"] = self.opclasses 

267 return path, self.expressions, kwargs