Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/factory/django.py: 44%

167 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# Copyright: See the LICENSE file. 

2 

3 

4"""factory_boy extensions for use with the Django framework.""" 

5 

6 

7import functools 

8import io 

9import logging 

10import os 

11 

12from django.core import files as django_files 

13from django.db import IntegrityError 

14 

15from . import base, declarations, errors 

16 

17logger = logging.getLogger('factory.generate') 

18 

19 

20DEFAULT_DB_ALIAS = 'default' # Same as django.db.DEFAULT_DB_ALIAS 

21 

22 

23_LAZY_LOADS = {} 

24 

25 

26def get_model(app, model): 

27 """Wrapper around django's get_model.""" 

28 if 'get_model' not in _LAZY_LOADS: 

29 _lazy_load_get_model() 

30 

31 _get_model = _LAZY_LOADS['get_model'] 

32 return _get_model(app, model) 

33 

34 

35def _lazy_load_get_model(): 

36 """Lazy loading of get_model. 

37 

38 get_model loads django.conf.settings, which may fail if 

39 the settings haven't been configured yet. 

40 """ 

41 from django import apps as django_apps 

42 _LAZY_LOADS['get_model'] = django_apps.apps.get_model 

43 

44 

45class DjangoOptions(base.FactoryOptions): 

46 def _build_default_options(self): 

47 return super()._build_default_options() + [ 

48 base.OptionDefault('django_get_or_create', (), inherit=True), 

49 base.OptionDefault('database', DEFAULT_DB_ALIAS, inherit=True), 

50 ] 

51 

52 def _get_counter_reference(self): 

53 counter_reference = super()._get_counter_reference() 

54 if (counter_reference == self.base_factory 54 ↛ 61line 54 didn't jump to line 61, because the condition on line 54 was never true

55 and self.base_factory._meta.model is not None 

56 and self.base_factory._meta.model._meta.abstract 

57 and self.model is not None 

58 and not self.model._meta.abstract): 

59 # Target factory is for an abstract model, yet we're for another, 

60 # concrete subclass => don't reuse the counter. 

61 return self.factory 

62 return counter_reference 

63 

64 def get_model_class(self): 

65 if isinstance(self.model, str) and '.' in self.model: 

66 app, model_name = self.model.split('.', 1) 

67 self.model = get_model(app, model_name) 

68 

69 return self.model 

70 

71 

72class DjangoModelFactory(base.Factory): 

73 """Factory for Django models. 

74 

75 This makes sure that the 'sequence' field of created objects is a new id. 

76 

77 Possible improvement: define a new 'attribute' type, AutoField, which would 

78 handle those for non-numerical primary keys. 

79 """ 

80 

81 _options_class = DjangoOptions 

82 

83 class Meta: 

84 abstract = True # Optional, but explicit. 

85 

86 @classmethod 

87 def _load_model_class(cls, definition): 

88 

89 if isinstance(definition, str) and '.' in definition: 

90 app, model = definition.split('.', 1) 

91 return get_model(app, model) 

92 

93 return definition 

94 

95 @classmethod 

96 def _get_manager(cls, model_class): 

97 if model_class is None: 97 ↛ 98line 97 didn't jump to line 98, because the condition on line 97 was never true

98 raise errors.AssociatedClassError( 

99 f"No model set on {cls.__module__}.{cls.__name__}.Meta") 

100 

101 try: 

102 manager = model_class.objects 

103 except AttributeError: 

104 # When inheriting from an abstract model with a custom 

105 # manager, the class has no 'objects' field. 

106 manager = model_class._default_manager 

107 

108 if cls._meta.database != DEFAULT_DB_ALIAS: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true

109 manager = manager.using(cls._meta.database) 

110 return manager 

111 

112 @classmethod 

113 def _generate(cls, strategy, params): 

114 # Original params are used in _get_or_create if it cannot build an 

115 # object initially due to an IntegrityError being raised 

116 cls._original_params = params 

117 return super()._generate(strategy, params) 

118 

119 @classmethod 

120 def _get_or_create(cls, model_class, *args, **kwargs): 

121 """Create an instance of the model through objects.get_or_create.""" 

122 manager = cls._get_manager(model_class) 

123 

124 assert 'defaults' not in cls._meta.django_get_or_create, ( 

125 "'defaults' is a reserved keyword for get_or_create " 

126 "(in %s._meta.django_get_or_create=%r)" 

127 % (cls, cls._meta.django_get_or_create)) 

128 

129 key_fields = {} 

130 for field in cls._meta.django_get_or_create: 

131 if field not in kwargs: 

132 raise errors.FactoryError( 

133 "django_get_or_create - " 

134 "Unable to find initialization value for '%s' in factory %s" % 

135 (field, cls.__name__)) 

136 key_fields[field] = kwargs.pop(field) 

137 key_fields['defaults'] = kwargs 

138 

139 try: 

140 instance, _created = manager.get_or_create(*args, **key_fields) 

141 except IntegrityError as e: 

142 get_or_create_params = { 

143 lookup: value 

144 for lookup, value in cls._original_params.items() 

145 if lookup in cls._meta.django_get_or_create 

146 } 

147 if get_or_create_params: 

148 try: 

149 instance = manager.get(**get_or_create_params) 

150 except manager.model.DoesNotExist: 

151 # Original params are not a valid lookup and triggered a create(), 

152 # that resulted in an IntegrityError. Follow Django’s behavior. 

153 raise e 

154 else: 

155 raise e 

156 

157 return instance 

158 

159 @classmethod 

160 def _create(cls, model_class, *args, **kwargs): 

161 """Create an instance of the model, and save it to the database.""" 

162 if cls._meta.django_get_or_create: 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true

163 return cls._get_or_create(model_class, *args, **kwargs) 

164 

165 manager = cls._get_manager(model_class) 

166 return manager.create(*args, **kwargs) 

167 

168 @classmethod 

169 def _after_postgeneration(cls, instance, create, results=None): 

170 """Save again the instance if creating and at least one hook ran.""" 

171 if create and results: 

172 # Some post-generation hooks ran, and may have modified us. 

173 instance.save() 

174 

175 

176class FileField(declarations.BaseDeclaration): 

177 """Helper to fill in django.db.models.FileField from a Factory.""" 

178 

179 DEFAULT_FILENAME = 'example.dat' 

180 

181 def _make_data(self, params): 

182 """Create data for the field.""" 

183 return params.get('data', b'') 

184 

185 def _make_content(self, params): 

186 path = '' 

187 

188 _content_params = [params.get('from_path'), params.get('from_file'), params.get('from_func')] 

189 if len([p for p in _content_params if p]) > 1: 

190 raise ValueError( 

191 "At most one argument from 'from_file', 'from_path', and 'from_func' should " 

192 "be non-empty when calling factory.django.FileField." 

193 ) 

194 

195 if params.get('from_path'): 

196 path = params['from_path'] 

197 with open(path, 'rb') as f: 

198 content = django_files.base.ContentFile(f.read()) 

199 

200 elif params.get('from_file'): 

201 f = params['from_file'] 

202 content = django_files.File(f) 

203 path = content.name 

204 

205 elif params.get('from_func'): 

206 func = params['from_func'] 

207 content = django_files.File(func()) 

208 path = content.name 

209 

210 else: 

211 data = self._make_data(params) 

212 content = django_files.base.ContentFile(data) 

213 

214 if path: 

215 default_filename = os.path.basename(path) 

216 else: 

217 default_filename = self.DEFAULT_FILENAME 

218 

219 filename = params.get('filename', default_filename) 

220 return filename, content 

221 

222 def evaluate(self, instance, step, extra): 

223 """Fill in the field.""" 

224 filename, content = self._make_content(extra) 

225 return django_files.File(content.file, filename) 

226 

227 

228class ImageField(FileField): 

229 DEFAULT_FILENAME = 'example.jpg' 

230 

231 def _make_data(self, params): 

232 # ImageField (both django's and factory_boy's) require PIL. 

233 # Try to import it along one of its known installation paths. 

234 from PIL import Image 

235 

236 width = params.get('width', 100) 

237 height = params.get('height', width) 

238 color = params.get('color', 'blue') 

239 image_format = params.get('format', 'JPEG') 

240 image_palette = params.get('palette', 'RGB') 

241 

242 thumb_io = io.BytesIO() 

243 with Image.new(image_palette, (width, height), color) as thumb: 

244 thumb.save(thumb_io, format=image_format) 

245 return thumb_io.getvalue() 

246 

247 

248class mute_signals: 

249 """Temporarily disables and then restores any django signals. 

250 

251 Args: 

252 *signals (django.dispatch.dispatcher.Signal): any django signals 

253 

254 Examples: 

255 with mute_signals(pre_init): 

256 user = UserFactory.build() 

257 ... 

258 

259 @mute_signals(pre_save, post_save) 

260 class UserFactory(factory.Factory): 

261 ... 

262 

263 @mute_signals(post_save) 

264 def generate_users(): 

265 UserFactory.create_batch(10) 

266 """ 

267 

268 def __init__(self, *signals): 

269 self.signals = signals 

270 self.paused = {} 

271 

272 def __enter__(self): 

273 for signal in self.signals: 

274 logger.debug('mute_signals: Disabling signal handlers %r', 

275 signal.receivers) 

276 

277 # Note that we're using implementation details of 

278 # django.signals, since arguments to signal.connect() 

279 # are lost in signal.receivers 

280 self.paused[signal] = signal.receivers 

281 signal.receivers = [] 

282 

283 def __exit__(self, exc_type, exc_value, traceback): 

284 for signal, receivers in self.paused.items(): 

285 logger.debug('mute_signals: Restoring signal handlers %r', 

286 receivers) 

287 

288 signal.receivers += receivers 

289 with signal.lock: 

290 # Django uses some caching for its signals. 

291 # Since we're bypassing signal.connect and signal.disconnect, 

292 # we have to keep messing with django's internals. 

293 signal.sender_receivers_cache.clear() 

294 self.paused = {} 

295 

296 def copy(self): 

297 return mute_signals(*self.signals) 

298 

299 def __call__(self, callable_obj): 

300 if isinstance(callable_obj, base.FactoryMetaClass): 

301 # Retrieve __func__, the *actual* callable object. 

302 callable_obj._create = self.wrap_method(callable_obj._create.__func__) 

303 callable_obj._generate = self.wrap_method(callable_obj._generate.__func__) 

304 return callable_obj 

305 

306 else: 

307 @functools.wraps(callable_obj) 

308 def wrapper(*args, **kwargs): 

309 # A mute_signals() object is not reentrant; use a copy every time. 

310 with self.copy(): 

311 return callable_obj(*args, **kwargs) 

312 return wrapper 

313 

314 def wrap_method(self, method): 

315 @classmethod 

316 @functools.wraps(method) 

317 def wrapped_method(*args, **kwargs): 

318 # A mute_signals() object is not reentrant; use a copy every time. 

319 with self.copy(): 

320 return method(*args, **kwargs) 

321 return wrapped_method