Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django_fsm/__init__.py: 50%

329 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# -*- coding: utf-8 -*- 

2""" 

3State tracking functionality for django models 

4""" 

5import inspect 

6import sys 

7from functools import wraps 

8 

9import django 

10from django.db import models 

11from django.db.models import Field 

12from django.db.models.query_utils import DeferredAttribute 

13from django.db.models.signals import class_prepared 

14from django_fsm.signals import pre_transition, post_transition 

15 

16try: 

17 from functools import partialmethod 

18except ImportError: 

19 # python 2.7, so we are on django<=1.11 

20 from django.utils.functional import curry as partialmethod 

21 

22try: 

23 from django.apps import apps as django_apps 

24 

25 def get_model(app_label, model_name): 

26 app = django_apps.get_app_config(app_label) 

27 return app.get_model(model_name) 

28 

29 

30except ImportError: 

31 from django.db.models.loading import get_model 

32 

33 

34__all__ = [ 

35 "TransitionNotAllowed", 

36 "ConcurrentTransition", 

37 "FSMFieldMixin", 

38 "FSMField", 

39 "FSMIntegerField", 

40 "FSMKeyField", 

41 "ConcurrentTransitionMixin", 

42 "transition", 

43 "can_proceed", 

44 "has_transition_perm", 

45 "GET_STATE", 

46 "RETURN_VALUE", 

47] 

48 

49if sys.version_info[:2] == (2, 6): 49 ↛ 52line 49 didn't jump to line 52, because the condition on line 49 was never true

50 # Backport of Python 2.7 inspect.getmembers, 

51 # since Python 2.6 ships buggy implementation 

52 def __getmembers(object, predicate=None): 

53 """Return all members of an object as (name, value) pairs sorted by name. 

54 Optionally, only return members that satisfy a given predicate.""" 

55 results = [] 

56 for key in dir(object): 

57 try: 

58 value = getattr(object, key) 

59 except AttributeError: 

60 continue 

61 if not predicate or predicate(value): 

62 results.append((key, value)) 

63 results.sort() 

64 return results 

65 

66 inspect.getmembers = __getmembers 

67 

68# South support; see http://south.aeracode.org/docs/tutorial/part4.html#simple-inheritance 

69try: 

70 from south.modelsinspector import add_introspection_rules 

71except ImportError: 

72 pass 

73else: 

74 add_introspection_rules([], [r"^django_fsm\.FSMField"]) 

75 add_introspection_rules([], [r"^django_fsm\.FSMIntegerField"]) 

76 add_introspection_rules([], [r"^django_fsm\.FSMKeyField"]) 

77 

78 

79class TransitionNotAllowed(Exception): 

80 """Raised when a transition is not allowed""" 

81 

82 def __init__(self, *args, **kwargs): 

83 self.object = kwargs.pop("object", None) 

84 self.method = kwargs.pop("method", None) 

85 super(TransitionNotAllowed, self).__init__(*args, **kwargs) 

86 

87 

88class InvalidResultState(Exception): 

89 """Raised when we got invalid result state""" 

90 

91 

92class ConcurrentTransition(Exception): 

93 """ 

94 Raised when the transition cannot be executed because the 

95 object has become stale (state has been changed since it 

96 was fetched from the database). 

97 """ 

98 

99 

100class Transition(object): 

101 def __init__(self, method, source, target, on_error, conditions, permission, custom): 

102 self.method = method 

103 self.source = source 

104 self.target = target 

105 self.on_error = on_error 

106 self.conditions = conditions 

107 self.permission = permission 

108 self.custom = custom 

109 

110 @property 

111 def name(self): 

112 return self.method.__name__ 

113 

114 def has_perm(self, instance, user): 

115 if not self.permission: 

116 return True 

117 elif callable(self.permission): 

118 return bool(self.permission(instance, user)) 

119 elif user.has_perm(self.permission, instance): 

120 return True 

121 elif user.has_perm(self.permission): 

122 return True 

123 else: 

124 return False 

125 

126 

127def get_available_FIELD_transitions(instance, field): 

128 """ 

129 List of transitions available in current model state 

130 with all conditions met 

131 """ 

132 curr_state = field.get_state(instance) 

133 transitions = field.transitions[instance.__class__] 

134 

135 for name, transition in transitions.items(): 

136 meta = transition._django_fsm 

137 if meta.has_transition(curr_state) and meta.conditions_met(instance, curr_state): 

138 yield meta.get_transition(curr_state) 

139 

140 

141def get_all_FIELD_transitions(instance, field): 

142 """ 

143 List of all transitions available in current model state 

144 """ 

145 return field.get_all_transitions(instance.__class__) 

146 

147 

148def get_available_user_FIELD_transitions(instance, user, field): 

149 """ 

150 List of transitions available in current model state 

151 with all conditions met and user have rights on it 

152 """ 

153 for transition in get_available_FIELD_transitions(instance, field): 

154 if transition.has_perm(instance, user): 

155 yield transition 

156 

157 

158class FSMMeta(object): 

159 """ 

160 Models methods transitions meta information 

161 """ 

162 

163 def __init__(self, field, method): 

164 self.field = field 

165 self.transitions = {} # source -> Transition 

166 

167 def get_transition(self, source): 

168 transition = self.transitions.get(source, None) 

169 if transition is None: 169 ↛ 170line 169 didn't jump to line 170, because the condition on line 169 was never true

170 transition = self.transitions.get("*", None) 

171 if transition is None: 171 ↛ 172line 171 didn't jump to line 172, because the condition on line 171 was never true

172 transition = self.transitions.get("+", None) 

173 return transition 

174 

175 def add_transition(self, method, source, target, on_error=None, conditions=[], permission=None, custom={}): 

176 if source in self.transitions: 176 ↛ 177line 176 didn't jump to line 177, because the condition on line 176 was never true

177 raise AssertionError("Duplicate transition for {0} state".format(source)) 

178 

179 self.transitions[source] = Transition( 

180 method=method, 

181 source=source, 

182 target=target, 

183 on_error=on_error, 

184 conditions=conditions, 

185 permission=permission, 

186 custom=custom, 

187 ) 

188 

189 def has_transition(self, state): 

190 """ 

191 Lookup if any transition exists from current model state using current method 

192 """ 

193 if state in self.transitions: 193 ↛ 196line 193 didn't jump to line 196, because the condition on line 193 was never false

194 return True 

195 

196 if "*" in self.transitions: 

197 return True 

198 

199 if "+" in self.transitions and self.transitions["+"].target != state: 

200 return True 

201 

202 return False 

203 

204 def conditions_met(self, instance, state): 

205 """ 

206 Check if all conditions have been met 

207 """ 

208 transition = self.get_transition(state) 

209 

210 if transition is None: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 return False 

212 elif transition.conditions is None: 212 ↛ 213line 212 didn't jump to line 213, because the condition on line 212 was never true

213 return True 

214 else: 

215 return all(map(lambda condition: condition(instance), transition.conditions)) 215 ↛ exitline 215 didn't run the lambda on line 215

216 

217 def has_transition_perm(self, instance, state, user): 

218 transition = self.get_transition(state) 

219 

220 if not transition: 

221 return False 

222 else: 

223 return transition.has_perm(instance, user) 

224 

225 def next_state(self, current_state): 

226 transition = self.get_transition(current_state) 

227 

228 if transition is None: 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true

229 raise TransitionNotAllowed("No transition from {0}".format(current_state)) 

230 

231 return transition.target 

232 

233 def exception_state(self, current_state): 

234 transition = self.get_transition(current_state) 

235 

236 if transition is None: 

237 raise TransitionNotAllowed("No transition from {0}".format(current_state)) 

238 

239 return transition.on_error 

240 

241 

242class FSMFieldDescriptor(object): 

243 def __init__(self, field): 

244 self.field = field 

245 

246 def __get__(self, instance, type=None): 

247 if instance is None: 

248 return self 

249 return self.field.get_state(instance) 

250 

251 def __set__(self, instance, value): 

252 if self.field.protected and self.field.name in instance.__dict__: 252 ↛ 253line 252 didn't jump to line 253, because the condition on line 252 was never true

253 raise AttributeError("Direct {0} modification is not allowed".format(self.field.name)) 

254 

255 # Update state 

256 self.field.set_proxy(instance, value) 

257 self.field.set_state(instance, value) 

258 

259 

260class FSMFieldMixin(object): 

261 descriptor_class = FSMFieldDescriptor 

262 

263 def __init__(self, *args, **kwargs): 

264 self.protected = kwargs.pop("protected", False) 

265 self.transitions = {} # cls -> (transitions name -> method) 

266 self.state_proxy = {} # state -> ProxyClsRef 

267 

268 state_choices = kwargs.pop("state_choices", None) 

269 choices = kwargs.get("choices", None) 

270 if state_choices is not None and choices is not None: 270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true

271 raise ValueError("Use one of choices or state_choices value") 

272 

273 if state_choices is not None: 273 ↛ 274line 273 didn't jump to line 274, because the condition on line 273 was never true

274 choices = [] 

275 for state, title, proxy_cls_ref in state_choices: 

276 choices.append((state, title)) 

277 self.state_proxy[state] = proxy_cls_ref 

278 kwargs["choices"] = choices 

279 

280 super(FSMFieldMixin, self).__init__(*args, **kwargs) 

281 

282 def deconstruct(self): 

283 name, path, args, kwargs = super(FSMFieldMixin, self).deconstruct() 

284 if self.protected: 284 ↛ 285line 284 didn't jump to line 285, because the condition on line 284 was never true

285 kwargs["protected"] = self.protected 

286 return name, path, args, kwargs 

287 

288 def get_state(self, instance): 

289 # The state field may be deferred. We delegate the logic of figuring 

290 # this out and loading the deferred field on-demand to Django's 

291 # built-in DeferredAttribute class. DeferredAttribute's instantiation 

292 # signature changed over time, so we need to check Django version 

293 # before proceeding to call DeferredAttribute. An alternative to this 

294 # would be copying the latest implementation of DeferredAttribute to 

295 # django_fsm, but this comes with the added responsibility of keeping 

296 # the copied code up to date. 

297 if django.VERSION[:3] >= (3, 0, 0): 297 ↛ 299line 297 didn't jump to line 299, because the condition on line 297 was never false

298 return DeferredAttribute(self).__get__(instance) 

299 elif django.VERSION[:3] >= (2, 1, 0): 

300 return DeferredAttribute(self.name).__get__(instance) 

301 elif django.VERSION[:3] >= (1, 10, 0): 

302 return DeferredAttribute(self.name, model=None).__get__(instance) 

303 else: 

304 # The field was either not deferred (in which case we can return it 

305 # right away) or ir was, but we are running on an unknown version 

306 # of Django and we do not know the appropriate DeferredAttribute 

307 # interface, and accessing the field will raise KeyError. 

308 return instance.__dict__[self.name] 

309 

310 def set_state(self, instance, state): 

311 instance.__dict__[self.name] = state 

312 

313 def set_proxy(self, instance, state): 

314 """ 

315 Change class 

316 """ 

317 if state in self.state_proxy: 317 ↛ 318line 317 didn't jump to line 318, because the condition on line 317 was never true

318 state_proxy = self.state_proxy[state] 

319 

320 try: 

321 app_label, model_name = state_proxy.split(".") 

322 except ValueError: 

323 # If we can't split, assume a model in current app 

324 app_label = instance._meta.app_label 

325 model_name = state_proxy 

326 

327 model = get_model(app_label, model_name) 

328 if model is None: 

329 raise ValueError("No model found {0}".format(state_proxy)) 

330 

331 instance.__class__ = model 

332 

333 def change_state(self, instance, method, *args, **kwargs): 

334 meta = method._django_fsm 

335 method_name = method.__name__ 

336 current_state = self.get_state(instance) 

337 

338 if not meta.has_transition(current_state): 338 ↛ 339line 338 didn't jump to line 339, because the condition on line 338 was never true

339 raise TransitionNotAllowed( 

340 "Can't switch from state '{0}' using method '{1}'".format(current_state, method_name), 

341 object=instance, 

342 method=method, 

343 ) 

344 if not meta.conditions_met(instance, current_state): 344 ↛ 345line 344 didn't jump to line 345, because the condition on line 344 was never true

345 raise TransitionNotAllowed( 

346 "Transition conditions have not been met for method '{0}'".format(method_name), object=instance, method=method 

347 ) 

348 

349 next_state = meta.next_state(current_state) 

350 

351 signal_kwargs = { 

352 "sender": instance.__class__, 

353 "instance": instance, 

354 "name": method_name, 

355 "field": meta.field, 

356 "source": current_state, 

357 "target": next_state, 

358 "method_args": args, 

359 "method_kwargs": kwargs, 

360 } 

361 

362 pre_transition.send(**signal_kwargs) 

363 

364 try: 

365 result = method(instance, *args, **kwargs) 

366 if next_state is not None: 366 ↛ 382line 366 didn't jump to line 382, because the condition on line 366 was never false

367 if hasattr(next_state, "get_state"): 367 ↛ 368line 367 didn't jump to line 368, because the condition on line 367 was never true

368 next_state = next_state.get_state(instance, transition, result, args=args, kwargs=kwargs) 

369 signal_kwargs["target"] = next_state 

370 self.set_proxy(instance, next_state) 

371 self.set_state(instance, next_state) 

372 except Exception as exc: 

373 exception_state = meta.exception_state(current_state) 

374 if exception_state: 

375 self.set_proxy(instance, exception_state) 

376 self.set_state(instance, exception_state) 

377 signal_kwargs["target"] = exception_state 

378 signal_kwargs["exception"] = exc 

379 post_transition.send(**signal_kwargs) 

380 raise 

381 else: 

382 post_transition.send(**signal_kwargs) 

383 

384 return result 

385 

386 def get_all_transitions(self, instance_cls): 

387 """ 

388 Returns [(source, target, name, method)] for all field transitions 

389 """ 

390 transitions = self.transitions[instance_cls] 

391 

392 for name, transition in transitions.items(): 

393 meta = transition._django_fsm 

394 

395 for transition in meta.transitions.values(): 

396 yield transition 

397 

398 def contribute_to_class(self, cls, name, **kwargs): 

399 self.base_cls = cls 

400 

401 super(FSMFieldMixin, self).contribute_to_class(cls, name, **kwargs) 

402 setattr(cls, self.name, self.descriptor_class(self)) 

403 setattr(cls, "get_all_{0}_transitions".format(self.name), partialmethod(get_all_FIELD_transitions, field=self)) 

404 setattr( 

405 cls, "get_available_{0}_transitions".format(self.name), partialmethod(get_available_FIELD_transitions, field=self) 

406 ) 

407 setattr( 

408 cls, 

409 "get_available_user_{0}_transitions".format(self.name), 

410 partialmethod(get_available_user_FIELD_transitions, field=self), 

411 ) 

412 

413 class_prepared.connect(self._collect_transitions) 

414 

415 def _collect_transitions(self, *args, **kwargs): 

416 sender = kwargs["sender"] 

417 

418 if not issubclass(sender, self.base_cls): 

419 return 

420 

421 def is_field_transition_method(attr): 

422 return ( 

423 (inspect.ismethod(attr) or inspect.isfunction(attr)) 

424 and hasattr(attr, "_django_fsm") 

425 and ( 

426 attr._django_fsm.field in [self, self.name] 

427 or ( 

428 isinstance(attr._django_fsm.field, Field) 

429 and issubclass(self.model, attr._django_fsm.field.model) 

430 and attr._django_fsm.field.name == self.name 

431 and attr._django_fsm.field.creation_counter == self.creation_counter 

432 ) 

433 ) 

434 ) 

435 

436 sender_transitions = {} 

437 transitions = inspect.getmembers(sender, predicate=is_field_transition_method) 

438 for method_name, method in transitions: 

439 method._django_fsm.field = self 

440 sender_transitions[method_name] = method 

441 

442 self.transitions[sender] = sender_transitions 

443 

444 

445class FSMField(FSMFieldMixin, models.CharField): 

446 """ 

447 State Machine support for Django model as CharField 

448 """ 

449 

450 def __init__(self, *args, **kwargs): 

451 kwargs.setdefault("max_length", 50) 

452 super(FSMField, self).__init__(*args, **kwargs) 

453 

454 

455class FSMIntegerField(FSMFieldMixin, models.IntegerField): 

456 """ 

457 Same as FSMField, but stores the state value in an IntegerField. 

458 """ 

459 

460 pass 

461 

462 

463class FSMKeyField(FSMFieldMixin, models.ForeignKey): 

464 """ 

465 State Machine support for Django model 

466 """ 

467 

468 def get_state(self, instance): 

469 return instance.__dict__[self.attname] 

470 

471 def set_state(self, instance, state): 

472 instance.__dict__[self.attname] = self.to_python(state) 

473 

474 

475class ConcurrentTransitionMixin(object): 

476 """ 

477 Protects a Model from undesirable effects caused by concurrently executed transitions, 

478 e.g. running the same transition multiple times at the same time, or running different 

479 transitions with the same SOURCE state at the same time. 

480 

481 This behavior is achieved using an idea based on optimistic locking. No additional 

482 version field is required though; only the state field(s) is/are used for the tracking. 

483 This scheme is not that strict as true *optimistic locking* mechanism, it is however 

484 more lightweight - leveraging the specifics of FSM models. 

485 

486 Instance of a model based on this Mixin will be prevented from saving into DB if any 

487 of its state fields (instances of FSMFieldMixin) has been changed since the object 

488 was fetched from the database. *ConcurrentTransition* exception will be raised in such 

489 cases. 

490 

491 For guaranteed protection against such race conditions, make sure: 

492 * Your transitions do not have any side effects except for changes in the database, 

493 * You always run the save() method on the object within django.db.transaction.atomic() 

494 block. 

495 

496 Following these recommendations, you can rely on ConcurrentTransitionMixin to cause 

497 a rollback of all the changes that have been executed in an inconsistent (out of sync) 

498 state, thus practically negating their effect. 

499 """ 

500 

501 def __init__(self, *args, **kwargs): 

502 super(ConcurrentTransitionMixin, self).__init__(*args, **kwargs) 

503 self._update_initial_state() 

504 

505 @property 

506 def state_fields(self): 

507 return filter(lambda field: isinstance(field, FSMFieldMixin), self._meta.fields) 

508 

509 def _do_update(self, base_qs, using, pk_val, values, update_fields, forced_update): 

510 # _do_update is called once for each model class in the inheritance hierarchy. 

511 # We can only filter the base_qs on state fields (can be more than one!) present in this particular model. 

512 

513 # Select state fields to filter on 

514 filter_on = filter(lambda field: field.model == base_qs.model, self.state_fields) 

515 

516 # state filter will be used to narrow down the standard filter checking only PK 

517 state_filter = dict((field.attname, self.__initial_states[field.attname]) for field in filter_on) 

518 

519 updated = super(ConcurrentTransitionMixin, self)._do_update( 

520 base_qs=base_qs.filter(**state_filter), 

521 using=using, 

522 pk_val=pk_val, 

523 values=values, 

524 update_fields=update_fields, 

525 forced_update=forced_update, 

526 ) 

527 

528 # It may happen that nothing was updated in the original _do_update method not because of unmatching state, 

529 # but because of missing PK. This codepath is possible when saving a new model instance with *preset PK*. 

530 # In this case Django does not know it has to do INSERT operation, so it tries UPDATE first and falls back to 

531 # INSERT if UPDATE fails. 

532 # Thus, we need to make sure we only catch the case when the object *is* in the DB, but with changed state; and 

533 # mimic standard _do_update behavior otherwise. Django will pick it up and execute _do_insert. 

534 if not updated and base_qs.filter(pk=pk_val).exists(): 

535 raise ConcurrentTransition("Cannot save object! The state has been changed since fetched from the database!") 

536 

537 return updated 

538 

539 def _update_initial_state(self): 

540 self.__initial_states = dict((field.attname, field.value_from_object(self)) for field in self.state_fields) 

541 

542 def refresh_from_db(self, *args, **kwargs): 

543 super(ConcurrentTransitionMixin, self).refresh_from_db(*args, **kwargs) 

544 self._update_initial_state() 

545 

546 def save(self, *args, **kwargs): 

547 super(ConcurrentTransitionMixin, self).save(*args, **kwargs) 

548 self._update_initial_state() 

549 

550 

551def transition(field, source="*", target=None, on_error=None, conditions=[], permission=None, custom={}): 

552 """ 

553 Method decorator to mark allowed transitions. 

554 

555 Set target to None if current state needs to be validated and 

556 has not changed after the function call. 

557 """ 

558 

559 def inner_transition(func): 

560 wrapper_installed, fsm_meta = True, getattr(func, "_django_fsm", None) 

561 if not fsm_meta: 561 ↛ 566line 561 didn't jump to line 566, because the condition on line 561 was never false

562 wrapper_installed = False 

563 fsm_meta = FSMMeta(field=field, method=func) 

564 setattr(func, "_django_fsm", fsm_meta) 

565 

566 if isinstance(source, (list, tuple, set)): 566 ↛ 567line 566 didn't jump to line 567, because the condition on line 566 was never true

567 for state in source: 

568 func._django_fsm.add_transition(func, state, target, on_error, conditions, permission, custom) 

569 else: 

570 func._django_fsm.add_transition(func, source, target, on_error, conditions, permission, custom) 

571 

572 @wraps(func) 

573 def _change_state(instance, *args, **kwargs): 

574 return fsm_meta.field.change_state(instance, func, *args, **kwargs) 

575 

576 if not wrapper_installed: 576 ↛ 579line 576 didn't jump to line 579, because the condition on line 576 was never false

577 return _change_state 

578 

579 return func 

580 

581 return inner_transition 

582 

583 

584def can_proceed(bound_method, check_conditions=True): 

585 """ 

586 Returns True if model in state allows to call bound_method 

587 

588 Set ``check_conditions`` argument to ``False`` to skip checking 

589 conditions. 

590 """ 

591 if not hasattr(bound_method, "_django_fsm"): 

592 im_func = getattr(bound_method, "im_func", getattr(bound_method, "__func__")) 

593 raise TypeError("%s method is not transition" % im_func.__name__) 

594 

595 meta = bound_method._django_fsm 

596 im_self = getattr(bound_method, "im_self", getattr(bound_method, "__self__")) 

597 current_state = meta.field.get_state(im_self) 

598 

599 return meta.has_transition(current_state) and (not check_conditions or meta.conditions_met(im_self, current_state)) 

600 

601 

602def has_transition_perm(bound_method, user): 

603 """ 

604 Returns True if model in state allows to call bound_method and user have rights on it 

605 """ 

606 if not hasattr(bound_method, "_django_fsm"): 

607 im_func = getattr(bound_method, "im_func", getattr(bound_method, "__func__")) 

608 raise TypeError("%s method is not transition" % im_func.__name__) 

609 

610 meta = bound_method._django_fsm 

611 im_self = getattr(bound_method, "im_self", getattr(bound_method, "__self__")) 

612 current_state = meta.field.get_state(im_self) 

613 

614 return ( 

615 meta.has_transition(current_state) 

616 and meta.conditions_met(im_self, current_state) 

617 and meta.has_transition_perm(im_self, current_state, user) 

618 ) 

619 

620 

621class State(object): 

622 def get_state(self, model, transition, result, args=[], kwargs={}): 

623 raise NotImplementedError 

624 

625 

626class RETURN_VALUE(State): 

627 def __init__(self, *allowed_states): 

628 self.allowed_states = allowed_states if allowed_states else None 

629 

630 def get_state(self, model, transition, result, args=[], kwargs={}): 

631 if self.allowed_states is not None: 

632 if result not in self.allowed_states: 

633 raise InvalidResultState("{} is not in list of allowed states\n{}".format(result, self.allowed_states)) 

634 return result 

635 

636 

637class GET_STATE(State): 

638 def __init__(self, func, states=None): 

639 self.func = func 

640 self.allowed_states = states 

641 

642 def get_state(self, model, transition, result, args=[], kwargs={}): 

643 result_state = self.func(model, *args, **kwargs) 

644 if self.allowed_states is not None: 

645 if result_state not in self.allowed_states: 

646 raise InvalidResultState("{} is not in list of allowed states\n{}".format(result, self.allowed_states)) 

647 return result_state