Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django_fsm/__init__.py: 50%
329 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# -*- coding: utf-8 -*-
2"""
3State tracking functionality for django models
4"""
5import inspect
6import sys
7from functools import wraps
9import django
10from django.db import models
11from django.db.models import Field
12from django.db.models.query_utils import DeferredAttribute
13from django.db.models.signals import class_prepared
14from django_fsm.signals import pre_transition, post_transition
16try:
17 from functools import partialmethod
18except ImportError:
19 # python 2.7, so we are on django<=1.11
20 from django.utils.functional import curry as partialmethod
22try:
23 from django.apps import apps as django_apps
25 def get_model(app_label, model_name):
26 app = django_apps.get_app_config(app_label)
27 return app.get_model(model_name)
30except ImportError:
31 from django.db.models.loading import get_model
34__all__ = [
35 "TransitionNotAllowed",
36 "ConcurrentTransition",
37 "FSMFieldMixin",
38 "FSMField",
39 "FSMIntegerField",
40 "FSMKeyField",
41 "ConcurrentTransitionMixin",
42 "transition",
43 "can_proceed",
44 "has_transition_perm",
45 "GET_STATE",
46 "RETURN_VALUE",
47]
49if sys.version_info[:2] == (2, 6): 49 ↛ 52line 49 didn't jump to line 52, because the condition on line 49 was never true
50 # Backport of Python 2.7 inspect.getmembers,
51 # since Python 2.6 ships buggy implementation
52 def __getmembers(object, predicate=None):
53 """Return all members of an object as (name, value) pairs sorted by name.
54 Optionally, only return members that satisfy a given predicate."""
55 results = []
56 for key in dir(object):
57 try:
58 value = getattr(object, key)
59 except AttributeError:
60 continue
61 if not predicate or predicate(value):
62 results.append((key, value))
63 results.sort()
64 return results
66 inspect.getmembers = __getmembers
68# South support; see http://south.aeracode.org/docs/tutorial/part4.html#simple-inheritance
69try:
70 from south.modelsinspector import add_introspection_rules
71except ImportError:
72 pass
73else:
74 add_introspection_rules([], [r"^django_fsm\.FSMField"])
75 add_introspection_rules([], [r"^django_fsm\.FSMIntegerField"])
76 add_introspection_rules([], [r"^django_fsm\.FSMKeyField"])
79class TransitionNotAllowed(Exception):
80 """Raised when a transition is not allowed"""
82 def __init__(self, *args, **kwargs):
83 self.object = kwargs.pop("object", None)
84 self.method = kwargs.pop("method", None)
85 super(TransitionNotAllowed, self).__init__(*args, **kwargs)
88class InvalidResultState(Exception):
89 """Raised when we got invalid result state"""
92class ConcurrentTransition(Exception):
93 """
94 Raised when the transition cannot be executed because the
95 object has become stale (state has been changed since it
96 was fetched from the database).
97 """
100class Transition(object):
101 def __init__(self, method, source, target, on_error, conditions, permission, custom):
102 self.method = method
103 self.source = source
104 self.target = target
105 self.on_error = on_error
106 self.conditions = conditions
107 self.permission = permission
108 self.custom = custom
110 @property
111 def name(self):
112 return self.method.__name__
114 def has_perm(self, instance, user):
115 if not self.permission:
116 return True
117 elif callable(self.permission):
118 return bool(self.permission(instance, user))
119 elif user.has_perm(self.permission, instance):
120 return True
121 elif user.has_perm(self.permission):
122 return True
123 else:
124 return False
127def get_available_FIELD_transitions(instance, field):
128 """
129 List of transitions available in current model state
130 with all conditions met
131 """
132 curr_state = field.get_state(instance)
133 transitions = field.transitions[instance.__class__]
135 for name, transition in transitions.items():
136 meta = transition._django_fsm
137 if meta.has_transition(curr_state) and meta.conditions_met(instance, curr_state):
138 yield meta.get_transition(curr_state)
141def get_all_FIELD_transitions(instance, field):
142 """
143 List of all transitions available in current model state
144 """
145 return field.get_all_transitions(instance.__class__)
148def get_available_user_FIELD_transitions(instance, user, field):
149 """
150 List of transitions available in current model state
151 with all conditions met and user have rights on it
152 """
153 for transition in get_available_FIELD_transitions(instance, field):
154 if transition.has_perm(instance, user):
155 yield transition
158class FSMMeta(object):
159 """
160 Models methods transitions meta information
161 """
163 def __init__(self, field, method):
164 self.field = field
165 self.transitions = {} # source -> Transition
167 def get_transition(self, source):
168 transition = self.transitions.get(source, None)
169 if transition is None: 169 ↛ 170line 169 didn't jump to line 170, because the condition on line 169 was never true
170 transition = self.transitions.get("*", None)
171 if transition is None: 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true
172 transition = self.transitions.get("+", None)
173 return transition
175 def add_transition(self, method, source, target, on_error=None, conditions=[], permission=None, custom={}):
176 if source in self.transitions: 176 ↛ 177line 176 didn't jump to line 177, because the condition on line 176 was never true
177 raise AssertionError("Duplicate transition for {0} state".format(source))
179 self.transitions[source] = Transition(
180 method=method,
181 source=source,
182 target=target,
183 on_error=on_error,
184 conditions=conditions,
185 permission=permission,
186 custom=custom,
187 )
189 def has_transition(self, state):
190 """
191 Lookup if any transition exists from current model state using current method
192 """
193 if state in self.transitions: 193 ↛ 196line 193 didn't jump to line 196, because the condition on line 193 was never false
194 return True
196 if "*" in self.transitions:
197 return True
199 if "+" in self.transitions and self.transitions["+"].target != state:
200 return True
202 return False
204 def conditions_met(self, instance, state):
205 """
206 Check if all conditions have been met
207 """
208 transition = self.get_transition(state)
210 if transition is None: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true
211 return False
212 elif transition.conditions is None: 212 ↛ 213line 212 didn't jump to line 213, because the condition on line 212 was never true
213 return True
214 else:
215 return all(map(lambda condition: condition(instance), transition.conditions)) 215 ↛ exitline 215 didn't run the lambda on line 215
217 def has_transition_perm(self, instance, state, user):
218 transition = self.get_transition(state)
220 if not transition:
221 return False
222 else:
223 return transition.has_perm(instance, user)
225 def next_state(self, current_state):
226 transition = self.get_transition(current_state)
228 if transition is None: 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true
229 raise TransitionNotAllowed("No transition from {0}".format(current_state))
231 return transition.target
233 def exception_state(self, current_state):
234 transition = self.get_transition(current_state)
236 if transition is None:
237 raise TransitionNotAllowed("No transition from {0}".format(current_state))
239 return transition.on_error
242class FSMFieldDescriptor(object):
243 def __init__(self, field):
244 self.field = field
246 def __get__(self, instance, type=None):
247 if instance is None:
248 return self
249 return self.field.get_state(instance)
251 def __set__(self, instance, value):
252 if self.field.protected and self.field.name in instance.__dict__: 252 ↛ 253line 252 didn't jump to line 253, because the condition on line 252 was never true
253 raise AttributeError("Direct {0} modification is not allowed".format(self.field.name))
255 # Update state
256 self.field.set_proxy(instance, value)
257 self.field.set_state(instance, value)
260class FSMFieldMixin(object):
261 descriptor_class = FSMFieldDescriptor
263 def __init__(self, *args, **kwargs):
264 self.protected = kwargs.pop("protected", False)
265 self.transitions = {} # cls -> (transitions name -> method)
266 self.state_proxy = {} # state -> ProxyClsRef
268 state_choices = kwargs.pop("state_choices", None)
269 choices = kwargs.get("choices", None)
270 if state_choices is not None and choices is not None: 270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true
271 raise ValueError("Use one of choices or state_choices value")
273 if state_choices is not None: 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true
274 choices = []
275 for state, title, proxy_cls_ref in state_choices:
276 choices.append((state, title))
277 self.state_proxy[state] = proxy_cls_ref
278 kwargs["choices"] = choices
280 super(FSMFieldMixin, self).__init__(*args, **kwargs)
282 def deconstruct(self):
283 name, path, args, kwargs = super(FSMFieldMixin, self).deconstruct()
284 if self.protected: 284 ↛ 285line 284 didn't jump to line 285, because the condition on line 284 was never true
285 kwargs["protected"] = self.protected
286 return name, path, args, kwargs
288 def get_state(self, instance):
289 # The state field may be deferred. We delegate the logic of figuring
290 # this out and loading the deferred field on-demand to Django's
291 # built-in DeferredAttribute class. DeferredAttribute's instantiation
292 # signature changed over time, so we need to check Django version
293 # before proceeding to call DeferredAttribute. An alternative to this
294 # would be copying the latest implementation of DeferredAttribute to
295 # django_fsm, but this comes with the added responsibility of keeping
296 # the copied code up to date.
297 if django.VERSION[:3] >= (3, 0, 0): 297 ↛ 299line 297 didn't jump to line 299, because the condition on line 297 was never false
298 return DeferredAttribute(self).__get__(instance)
299 elif django.VERSION[:3] >= (2, 1, 0):
300 return DeferredAttribute(self.name).__get__(instance)
301 elif django.VERSION[:3] >= (1, 10, 0):
302 return DeferredAttribute(self.name, model=None).__get__(instance)
303 else:
304 # The field was either not deferred (in which case we can return it
305 # right away) or ir was, but we are running on an unknown version
306 # of Django and we do not know the appropriate DeferredAttribute
307 # interface, and accessing the field will raise KeyError.
308 return instance.__dict__[self.name]
310 def set_state(self, instance, state):
311 instance.__dict__[self.name] = state
313 def set_proxy(self, instance, state):
314 """
315 Change class
316 """
317 if state in self.state_proxy: 317 ↛ 318line 317 didn't jump to line 318, because the condition on line 317 was never true
318 state_proxy = self.state_proxy[state]
320 try:
321 app_label, model_name = state_proxy.split(".")
322 except ValueError:
323 # If we can't split, assume a model in current app
324 app_label = instance._meta.app_label
325 model_name = state_proxy
327 model = get_model(app_label, model_name)
328 if model is None:
329 raise ValueError("No model found {0}".format(state_proxy))
331 instance.__class__ = model
333 def change_state(self, instance, method, *args, **kwargs):
334 meta = method._django_fsm
335 method_name = method.__name__
336 current_state = self.get_state(instance)
338 if not meta.has_transition(current_state): 338 ↛ 339line 338 didn't jump to line 339, because the condition on line 338 was never true
339 raise TransitionNotAllowed(
340 "Can't switch from state '{0}' using method '{1}'".format(current_state, method_name),
341 object=instance,
342 method=method,
343 )
344 if not meta.conditions_met(instance, current_state): 344 ↛ 345line 344 didn't jump to line 345, because the condition on line 344 was never true
345 raise TransitionNotAllowed(
346 "Transition conditions have not been met for method '{0}'".format(method_name), object=instance, method=method
347 )
349 next_state = meta.next_state(current_state)
351 signal_kwargs = {
352 "sender": instance.__class__,
353 "instance": instance,
354 "name": method_name,
355 "field": meta.field,
356 "source": current_state,
357 "target": next_state,
358 "method_args": args,
359 "method_kwargs": kwargs,
360 }
362 pre_transition.send(**signal_kwargs)
364 try:
365 result = method(instance, *args, **kwargs)
366 if next_state is not None: 366 ↛ 382line 366 didn't jump to line 382, because the condition on line 366 was never false
367 if hasattr(next_state, "get_state"): 367 ↛ 368line 367 didn't jump to line 368, because the condition on line 367 was never true
368 next_state = next_state.get_state(instance, transition, result, args=args, kwargs=kwargs)
369 signal_kwargs["target"] = next_state
370 self.set_proxy(instance, next_state)
371 self.set_state(instance, next_state)
372 except Exception as exc:
373 exception_state = meta.exception_state(current_state)
374 if exception_state:
375 self.set_proxy(instance, exception_state)
376 self.set_state(instance, exception_state)
377 signal_kwargs["target"] = exception_state
378 signal_kwargs["exception"] = exc
379 post_transition.send(**signal_kwargs)
380 raise
381 else:
382 post_transition.send(**signal_kwargs)
384 return result
386 def get_all_transitions(self, instance_cls):
387 """
388 Returns [(source, target, name, method)] for all field transitions
389 """
390 transitions = self.transitions[instance_cls]
392 for name, transition in transitions.items():
393 meta = transition._django_fsm
395 for transition in meta.transitions.values():
396 yield transition
398 def contribute_to_class(self, cls, name, **kwargs):
399 self.base_cls = cls
401 super(FSMFieldMixin, self).contribute_to_class(cls, name, **kwargs)
402 setattr(cls, self.name, self.descriptor_class(self))
403 setattr(cls, "get_all_{0}_transitions".format(self.name), partialmethod(get_all_FIELD_transitions, field=self))
404 setattr(
405 cls, "get_available_{0}_transitions".format(self.name), partialmethod(get_available_FIELD_transitions, field=self)
406 )
407 setattr(
408 cls,
409 "get_available_user_{0}_transitions".format(self.name),
410 partialmethod(get_available_user_FIELD_transitions, field=self),
411 )
413 class_prepared.connect(self._collect_transitions)
415 def _collect_transitions(self, *args, **kwargs):
416 sender = kwargs["sender"]
418 if not issubclass(sender, self.base_cls):
419 return
421 def is_field_transition_method(attr):
422 return (
423 (inspect.ismethod(attr) or inspect.isfunction(attr))
424 and hasattr(attr, "_django_fsm")
425 and (
426 attr._django_fsm.field in [self, self.name]
427 or (
428 isinstance(attr._django_fsm.field, Field)
429 and issubclass(self.model, attr._django_fsm.field.model)
430 and attr._django_fsm.field.name == self.name
431 and attr._django_fsm.field.creation_counter == self.creation_counter
432 )
433 )
434 )
436 sender_transitions = {}
437 transitions = inspect.getmembers(sender, predicate=is_field_transition_method)
438 for method_name, method in transitions:
439 method._django_fsm.field = self
440 sender_transitions[method_name] = method
442 self.transitions[sender] = sender_transitions
445class FSMField(FSMFieldMixin, models.CharField):
446 """
447 State Machine support for Django model as CharField
448 """
450 def __init__(self, *args, **kwargs):
451 kwargs.setdefault("max_length", 50)
452 super(FSMField, self).__init__(*args, **kwargs)
455class FSMIntegerField(FSMFieldMixin, models.IntegerField):
456 """
457 Same as FSMField, but stores the state value in an IntegerField.
458 """
460 pass
463class FSMKeyField(FSMFieldMixin, models.ForeignKey):
464 """
465 State Machine support for Django model
466 """
468 def get_state(self, instance):
469 return instance.__dict__[self.attname]
471 def set_state(self, instance, state):
472 instance.__dict__[self.attname] = self.to_python(state)
475class ConcurrentTransitionMixin(object):
476 """
477 Protects a Model from undesirable effects caused by concurrently executed transitions,
478 e.g. running the same transition multiple times at the same time, or running different
479 transitions with the same SOURCE state at the same time.
481 This behavior is achieved using an idea based on optimistic locking. No additional
482 version field is required though; only the state field(s) is/are used for the tracking.
483 This scheme is not that strict as true *optimistic locking* mechanism, it is however
484 more lightweight - leveraging the specifics of FSM models.
486 Instance of a model based on this Mixin will be prevented from saving into DB if any
487 of its state fields (instances of FSMFieldMixin) has been changed since the object
488 was fetched from the database. *ConcurrentTransition* exception will be raised in such
489 cases.
491 For guaranteed protection against such race conditions, make sure:
492 * Your transitions do not have any side effects except for changes in the database,
493 * You always run the save() method on the object within django.db.transaction.atomic()
494 block.
496 Following these recommendations, you can rely on ConcurrentTransitionMixin to cause
497 a rollback of all the changes that have been executed in an inconsistent (out of sync)
498 state, thus practically negating their effect.
499 """
501 def __init__(self, *args, **kwargs):
502 super(ConcurrentTransitionMixin, self).__init__(*args, **kwargs)
503 self._update_initial_state()
505 @property
506 def state_fields(self):
507 return filter(lambda field: isinstance(field, FSMFieldMixin), self._meta.fields)
509 def _do_update(self, base_qs, using, pk_val, values, update_fields, forced_update):
510 # _do_update is called once for each model class in the inheritance hierarchy.
511 # We can only filter the base_qs on state fields (can be more than one!) present in this particular model.
513 # Select state fields to filter on
514 filter_on = filter(lambda field: field.model == base_qs.model, self.state_fields)
516 # state filter will be used to narrow down the standard filter checking only PK
517 state_filter = dict((field.attname, self.__initial_states[field.attname]) for field in filter_on)
519 updated = super(ConcurrentTransitionMixin, self)._do_update(
520 base_qs=base_qs.filter(**state_filter),
521 using=using,
522 pk_val=pk_val,
523 values=values,
524 update_fields=update_fields,
525 forced_update=forced_update,
526 )
528 # It may happen that nothing was updated in the original _do_update method not because of unmatching state,
529 # but because of missing PK. This codepath is possible when saving a new model instance with *preset PK*.
530 # In this case Django does not know it has to do INSERT operation, so it tries UPDATE first and falls back to
531 # INSERT if UPDATE fails.
532 # Thus, we need to make sure we only catch the case when the object *is* in the DB, but with changed state; and
533 # mimic standard _do_update behavior otherwise. Django will pick it up and execute _do_insert.
534 if not updated and base_qs.filter(pk=pk_val).exists():
535 raise ConcurrentTransition("Cannot save object! The state has been changed since fetched from the database!")
537 return updated
539 def _update_initial_state(self):
540 self.__initial_states = dict((field.attname, field.value_from_object(self)) for field in self.state_fields)
542 def refresh_from_db(self, *args, **kwargs):
543 super(ConcurrentTransitionMixin, self).refresh_from_db(*args, **kwargs)
544 self._update_initial_state()
546 def save(self, *args, **kwargs):
547 super(ConcurrentTransitionMixin, self).save(*args, **kwargs)
548 self._update_initial_state()
551def transition(field, source="*", target=None, on_error=None, conditions=[], permission=None, custom={}):
552 """
553 Method decorator to mark allowed transitions.
555 Set target to None if current state needs to be validated and
556 has not changed after the function call.
557 """
559 def inner_transition(func):
560 wrapper_installed, fsm_meta = True, getattr(func, "_django_fsm", None)
561 if not fsm_meta: 561 ↛ 566line 561 didn't jump to line 566, because the condition on line 561 was never false
562 wrapper_installed = False
563 fsm_meta = FSMMeta(field=field, method=func)
564 setattr(func, "_django_fsm", fsm_meta)
566 if isinstance(source, (list, tuple, set)): 566 ↛ 567line 566 didn't jump to line 567, because the condition on line 566 was never true
567 for state in source:
568 func._django_fsm.add_transition(func, state, target, on_error, conditions, permission, custom)
569 else:
570 func._django_fsm.add_transition(func, source, target, on_error, conditions, permission, custom)
572 @wraps(func)
573 def _change_state(instance, *args, **kwargs):
574 return fsm_meta.field.change_state(instance, func, *args, **kwargs)
576 if not wrapper_installed: 576 ↛ 579line 576 didn't jump to line 579, because the condition on line 576 was never false
577 return _change_state
579 return func
581 return inner_transition
584def can_proceed(bound_method, check_conditions=True):
585 """
586 Returns True if model in state allows to call bound_method
588 Set ``check_conditions`` argument to ``False`` to skip checking
589 conditions.
590 """
591 if not hasattr(bound_method, "_django_fsm"):
592 im_func = getattr(bound_method, "im_func", getattr(bound_method, "__func__"))
593 raise TypeError("%s method is not transition" % im_func.__name__)
595 meta = bound_method._django_fsm
596 im_self = getattr(bound_method, "im_self", getattr(bound_method, "__self__"))
597 current_state = meta.field.get_state(im_self)
599 return meta.has_transition(current_state) and (not check_conditions or meta.conditions_met(im_self, current_state))
602def has_transition_perm(bound_method, user):
603 """
604 Returns True if model in state allows to call bound_method and user have rights on it
605 """
606 if not hasattr(bound_method, "_django_fsm"):
607 im_func = getattr(bound_method, "im_func", getattr(bound_method, "__func__"))
608 raise TypeError("%s method is not transition" % im_func.__name__)
610 meta = bound_method._django_fsm
611 im_self = getattr(bound_method, "im_self", getattr(bound_method, "__self__"))
612 current_state = meta.field.get_state(im_self)
614 return (
615 meta.has_transition(current_state)
616 and meta.conditions_met(im_self, current_state)
617 and meta.has_transition_perm(im_self, current_state, user)
618 )
621class State(object):
622 def get_state(self, model, transition, result, args=[], kwargs={}):
623 raise NotImplementedError
626class RETURN_VALUE(State):
627 def __init__(self, *allowed_states):
628 self.allowed_states = allowed_states if allowed_states else None
630 def get_state(self, model, transition, result, args=[], kwargs={}):
631 if self.allowed_states is not None:
632 if result not in self.allowed_states:
633 raise InvalidResultState("{} is not in list of allowed states\n{}".format(result, self.allowed_states))
634 return result
637class GET_STATE(State):
638 def __init__(self, func, states=None):
639 self.func = func
640 self.allowed_states = states
642 def get_state(self, model, transition, result, args=[], kwargs={}):
643 result_state = self.func(model, *args, **kwargs)
644 if self.allowed_states is not None:
645 if result_state not in self.allowed_states:
646 raise InvalidResultState("{} is not in list of allowed states\n{}".format(result, self.allowed_states))
647 return result_state