Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/factory/django.py: 44%
167 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# Copyright: See the LICENSE file.
4"""factory_boy extensions for use with the Django framework."""
7import functools
8import io
9import logging
10import os
12from django.core import files as django_files
13from django.db import IntegrityError
15from . import base, declarations, errors
17logger = logging.getLogger('factory.generate')
20DEFAULT_DB_ALIAS = 'default' # Same as django.db.DEFAULT_DB_ALIAS
23_LAZY_LOADS = {}
26def get_model(app, model):
27 """Wrapper around django's get_model."""
28 if 'get_model' not in _LAZY_LOADS:
29 _lazy_load_get_model()
31 _get_model = _LAZY_LOADS['get_model']
32 return _get_model(app, model)
35def _lazy_load_get_model():
36 """Lazy loading of get_model.
38 get_model loads django.conf.settings, which may fail if
39 the settings haven't been configured yet.
40 """
41 from django import apps as django_apps
42 _LAZY_LOADS['get_model'] = django_apps.apps.get_model
45class DjangoOptions(base.FactoryOptions):
46 def _build_default_options(self):
47 return super()._build_default_options() + [
48 base.OptionDefault('django_get_or_create', (), inherit=True),
49 base.OptionDefault('database', DEFAULT_DB_ALIAS, inherit=True),
50 ]
52 def _get_counter_reference(self):
53 counter_reference = super()._get_counter_reference()
54 if (counter_reference == self.base_factory 54 ↛ 61line 54 didn't jump to line 61, because the condition on line 54 was never true
55 and self.base_factory._meta.model is not None
56 and self.base_factory._meta.model._meta.abstract
57 and self.model is not None
58 and not self.model._meta.abstract):
59 # Target factory is for an abstract model, yet we're for another,
60 # concrete subclass => don't reuse the counter.
61 return self.factory
62 return counter_reference
64 def get_model_class(self):
65 if isinstance(self.model, str) and '.' in self.model:
66 app, model_name = self.model.split('.', 1)
67 self.model = get_model(app, model_name)
69 return self.model
72class DjangoModelFactory(base.Factory):
73 """Factory for Django models.
75 This makes sure that the 'sequence' field of created objects is a new id.
77 Possible improvement: define a new 'attribute' type, AutoField, which would
78 handle those for non-numerical primary keys.
79 """
81 _options_class = DjangoOptions
83 class Meta:
84 abstract = True # Optional, but explicit.
86 @classmethod
87 def _load_model_class(cls, definition):
89 if isinstance(definition, str) and '.' in definition:
90 app, model = definition.split('.', 1)
91 return get_model(app, model)
93 return definition
95 @classmethod
96 def _get_manager(cls, model_class):
97 if model_class is None: 97 ↛ 98line 97 didn't jump to line 98, because the condition on line 97 was never true
98 raise errors.AssociatedClassError(
99 f"No model set on {cls.__module__}.{cls.__name__}.Meta")
101 try:
102 manager = model_class.objects
103 except AttributeError:
104 # When inheriting from an abstract model with a custom
105 # manager, the class has no 'objects' field.
106 manager = model_class._default_manager
108 if cls._meta.database != DEFAULT_DB_ALIAS: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true
109 manager = manager.using(cls._meta.database)
110 return manager
112 @classmethod
113 def _generate(cls, strategy, params):
114 # Original params are used in _get_or_create if it cannot build an
115 # object initially due to an IntegrityError being raised
116 cls._original_params = params
117 return super()._generate(strategy, params)
119 @classmethod
120 def _get_or_create(cls, model_class, *args, **kwargs):
121 """Create an instance of the model through objects.get_or_create."""
122 manager = cls._get_manager(model_class)
124 assert 'defaults' not in cls._meta.django_get_or_create, (
125 "'defaults' is a reserved keyword for get_or_create "
126 "(in %s._meta.django_get_or_create=%r)"
127 % (cls, cls._meta.django_get_or_create))
129 key_fields = {}
130 for field in cls._meta.django_get_or_create:
131 if field not in kwargs:
132 raise errors.FactoryError(
133 "django_get_or_create - "
134 "Unable to find initialization value for '%s' in factory %s" %
135 (field, cls.__name__))
136 key_fields[field] = kwargs.pop(field)
137 key_fields['defaults'] = kwargs
139 try:
140 instance, _created = manager.get_or_create(*args, **key_fields)
141 except IntegrityError as e:
142 get_or_create_params = {
143 lookup: value
144 for lookup, value in cls._original_params.items()
145 if lookup in cls._meta.django_get_or_create
146 }
147 if get_or_create_params:
148 try:
149 instance = manager.get(**get_or_create_params)
150 except manager.model.DoesNotExist:
151 # Original params are not a valid lookup and triggered a create(),
152 # that resulted in an IntegrityError. Follow Django’s behavior.
153 raise e
154 else:
155 raise e
157 return instance
159 @classmethod
160 def _create(cls, model_class, *args, **kwargs):
161 """Create an instance of the model, and save it to the database."""
162 if cls._meta.django_get_or_create: 162 ↛ 163line 162 didn't jump to line 163, because the condition on line 162 was never true
163 return cls._get_or_create(model_class, *args, **kwargs)
165 manager = cls._get_manager(model_class)
166 return manager.create(*args, **kwargs)
168 @classmethod
169 def _after_postgeneration(cls, instance, create, results=None):
170 """Save again the instance if creating and at least one hook ran."""
171 if create and results:
172 # Some post-generation hooks ran, and may have modified us.
173 instance.save()
176class FileField(declarations.BaseDeclaration):
177 """Helper to fill in django.db.models.FileField from a Factory."""
179 DEFAULT_FILENAME = 'example.dat'
181 def _make_data(self, params):
182 """Create data for the field."""
183 return params.get('data', b'')
185 def _make_content(self, params):
186 path = ''
188 _content_params = [params.get('from_path'), params.get('from_file'), params.get('from_func')]
189 if len([p for p in _content_params if p]) > 1:
190 raise ValueError(
191 "At most one argument from 'from_file', 'from_path', and 'from_func' should "
192 "be non-empty when calling factory.django.FileField."
193 )
195 if params.get('from_path'):
196 path = params['from_path']
197 with open(path, 'rb') as f:
198 content = django_files.base.ContentFile(f.read())
200 elif params.get('from_file'):
201 f = params['from_file']
202 content = django_files.File(f)
203 path = content.name
205 elif params.get('from_func'):
206 func = params['from_func']
207 content = django_files.File(func())
208 path = content.name
210 else:
211 data = self._make_data(params)
212 content = django_files.base.ContentFile(data)
214 if path:
215 default_filename = os.path.basename(path)
216 else:
217 default_filename = self.DEFAULT_FILENAME
219 filename = params.get('filename', default_filename)
220 return filename, content
222 def evaluate(self, instance, step, extra):
223 """Fill in the field."""
224 filename, content = self._make_content(extra)
225 return django_files.File(content.file, filename)
228class ImageField(FileField):
229 DEFAULT_FILENAME = 'example.jpg'
231 def _make_data(self, params):
232 # ImageField (both django's and factory_boy's) require PIL.
233 # Try to import it along one of its known installation paths.
234 from PIL import Image
236 width = params.get('width', 100)
237 height = params.get('height', width)
238 color = params.get('color', 'blue')
239 image_format = params.get('format', 'JPEG')
240 image_palette = params.get('palette', 'RGB')
242 thumb_io = io.BytesIO()
243 with Image.new(image_palette, (width, height), color) as thumb:
244 thumb.save(thumb_io, format=image_format)
245 return thumb_io.getvalue()
248class mute_signals:
249 """Temporarily disables and then restores any django signals.
251 Args:
252 *signals (django.dispatch.dispatcher.Signal): any django signals
254 Examples:
255 with mute_signals(pre_init):
256 user = UserFactory.build()
257 ...
259 @mute_signals(pre_save, post_save)
260 class UserFactory(factory.Factory):
261 ...
263 @mute_signals(post_save)
264 def generate_users():
265 UserFactory.create_batch(10)
266 """
268 def __init__(self, *signals):
269 self.signals = signals
270 self.paused = {}
272 def __enter__(self):
273 for signal in self.signals:
274 logger.debug('mute_signals: Disabling signal handlers %r',
275 signal.receivers)
277 # Note that we're using implementation details of
278 # django.signals, since arguments to signal.connect()
279 # are lost in signal.receivers
280 self.paused[signal] = signal.receivers
281 signal.receivers = []
283 def __exit__(self, exc_type, exc_value, traceback):
284 for signal, receivers in self.paused.items():
285 logger.debug('mute_signals: Restoring signal handlers %r',
286 receivers)
288 signal.receivers += receivers
289 with signal.lock:
290 # Django uses some caching for its signals.
291 # Since we're bypassing signal.connect and signal.disconnect,
292 # we have to keep messing with django's internals.
293 signal.sender_receivers_cache.clear()
294 self.paused = {}
296 def copy(self):
297 return mute_signals(*self.signals)
299 def __call__(self, callable_obj):
300 if isinstance(callable_obj, base.FactoryMetaClass):
301 # Retrieve __func__, the *actual* callable object.
302 callable_obj._create = self.wrap_method(callable_obj._create.__func__)
303 callable_obj._generate = self.wrap_method(callable_obj._generate.__func__)
304 return callable_obj
306 else:
307 @functools.wraps(callable_obj)
308 def wrapper(*args, **kwargs):
309 # A mute_signals() object is not reentrant; use a copy every time.
310 with self.copy():
311 return callable_obj(*args, **kwargs)
312 return wrapper
314 def wrap_method(self, method):
315 @classmethod
316 @functools.wraps(method)
317 def wrapped_method(*args, **kwargs):
318 # A mute_signals() object is not reentrant; use a copy every time.
319 with self.copy():
320 return method(*args, **kwargs)
321 return wrapped_method