Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/contrib/contenttypes/management/__init__.py: 58%

77 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from django.apps import apps as global_apps 

2from django.db import DEFAULT_DB_ALIAS, IntegrityError, migrations, router, transaction 

3 

4 

5class RenameContentType(migrations.RunPython): 

6 def __init__(self, app_label, old_model, new_model): 

7 self.app_label = app_label 

8 self.old_model = old_model 

9 self.new_model = new_model 

10 super().__init__(self.rename_forward, self.rename_backward) 

11 

12 def _rename(self, apps, schema_editor, old_model, new_model): 

13 ContentType = apps.get_model("contenttypes", "ContentType") 

14 db = schema_editor.connection.alias 

15 if not router.allow_migrate_model(db, ContentType): 

16 return 

17 

18 try: 

19 content_type = ContentType.objects.db_manager(db).get_by_natural_key( 

20 self.app_label, old_model 

21 ) 

22 except ContentType.DoesNotExist: 

23 pass 

24 else: 

25 content_type.model = new_model 

26 try: 

27 with transaction.atomic(using=db): 

28 content_type.save(using=db, update_fields={"model"}) 

29 except IntegrityError: 

30 # Gracefully fallback if a stale content type causes a 

31 # conflict as remove_stale_contenttypes will take care of 

32 # asking the user what should be done next. 

33 content_type.model = old_model 

34 else: 

35 # Clear the cache as the `get_by_natural_key()` call will cache 

36 # the renamed ContentType instance by its old model name. 

37 ContentType.objects.clear_cache() 

38 

39 def rename_forward(self, apps, schema_editor): 

40 self._rename(apps, schema_editor, self.old_model, self.new_model) 

41 

42 def rename_backward(self, apps, schema_editor): 

43 self._rename(apps, schema_editor, self.new_model, self.old_model) 

44 

45 

46def inject_rename_contenttypes_operations( 

47 plan=None, apps=global_apps, using=DEFAULT_DB_ALIAS, **kwargs 

48): 

49 """ 

50 Insert a `RenameContentType` operation after every planned `RenameModel` 

51 operation. 

52 """ 

53 if plan is None: 53 ↛ 54line 53 didn't jump to line 54, because the condition on line 53 was never true

54 return 

55 

56 # Determine whether or not the ContentType model is available. 

57 try: 

58 ContentType = apps.get_model("contenttypes", "ContentType") 

59 except LookupError: 

60 available = False 

61 else: 

62 if not router.allow_migrate_model(using, ContentType): 

63 return 

64 available = True 

65 

66 for migration, backward in plan: 

67 if (migration.app_label, migration.name) == ("contenttypes", "0001_initial"): 

68 # There's no point in going forward if the initial contenttypes 

69 # migration is unapplied as the ContentType model will be 

70 # unavailable from this point. 

71 if backward: 71 ↛ 72line 71 didn't jump to line 72, because the condition on line 71 was never true

72 break 

73 else: 

74 available = True 

75 continue 

76 # The ContentType model is not available yet. 

77 if not available: 

78 continue 

79 inserts = [] 

80 for index, operation in enumerate(migration.operations): 

81 if isinstance(operation, migrations.RenameModel): 81 ↛ 82line 81 didn't jump to line 82, because the condition on line 81 was never true

82 operation = RenameContentType( 

83 migration.app_label, 

84 operation.old_name_lower, 

85 operation.new_name_lower, 

86 ) 

87 inserts.append((index + 1, operation)) 

88 for inserted, (index, operation) in enumerate(inserts): 88 ↛ 89line 88 didn't jump to line 89, because the loop on line 88 never started

89 migration.operations.insert(inserted + index, operation) 

90 

91 

92def get_contenttypes_and_models(app_config, using, ContentType): 

93 if not router.allow_migrate_model(using, ContentType): 93 ↛ 94line 93 didn't jump to line 94, because the condition on line 93 was never true

94 return None, None 

95 

96 ContentType.objects.clear_cache() 

97 

98 content_types = { 

99 ct.model: ct 

100 for ct in ContentType.objects.using(using).filter(app_label=app_config.label) 

101 } 

102 app_models = {model._meta.model_name: model for model in app_config.get_models()} 

103 return content_types, app_models 

104 

105 

106def create_contenttypes( 

107 app_config, 

108 verbosity=2, 

109 interactive=True, 

110 using=DEFAULT_DB_ALIAS, 

111 apps=global_apps, 

112 **kwargs, 

113): 

114 """ 

115 Create content types for models in the given app. 

116 """ 

117 if not app_config.models_module: 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true

118 return 

119 

120 app_label = app_config.label 

121 try: 

122 app_config = apps.get_app_config(app_label) 

123 ContentType = apps.get_model("contenttypes", "ContentType") 

124 except LookupError: 

125 return 

126 

127 content_types, app_models = get_contenttypes_and_models( 

128 app_config, using, ContentType 

129 ) 

130 

131 if not app_models: 

132 return 

133 

134 cts = [ 

135 ContentType( 

136 app_label=app_label, 

137 model=model_name, 

138 ) 

139 for (model_name, model) in app_models.items() 

140 if model_name not in content_types 

141 ] 

142 ContentType.objects.using(using).bulk_create(cts) 

143 if verbosity >= 2: 143 ↛ 144line 143 didn't jump to line 144, because the condition on line 143 was never true

144 for ct in cts: 

145 print("Adding content type '%s | %s'" % (ct.app_label, ct.model))