Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/base/creation.py: 44%

152 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import os 

2import sys 

3from io import StringIO 

4from unittest import expectedFailure, skip 

5 

6from django.apps import apps 

7from django.conf import settings 

8from django.core import serializers 

9from django.db import router 

10from django.db.transaction import atomic 

11from django.utils.module_loading import import_string 

12 

13# The prefix to put on the default database name when creating 

14# the test database. 

15TEST_DATABASE_PREFIX = "test_" 

16 

17 

18class BaseDatabaseCreation: 

19 """ 

20 Encapsulate backend-specific differences pertaining to creation and 

21 destruction of the test database. 

22 """ 

23 

24 def __init__(self, connection): 

25 self.connection = connection 

26 

27 def _nodb_cursor(self): 

28 return self.connection._nodb_cursor() 

29 

30 def log(self, msg): 

31 sys.stderr.write(msg + os.linesep) 

32 

33 def create_test_db( 

34 self, verbosity=1, autoclobber=False, serialize=True, keepdb=False 

35 ): 

36 """ 

37 Create a test database, prompting the user for confirmation if the 

38 database already exists. Return the name of the test database created. 

39 """ 

40 # Don't import django.core.management if it isn't needed. 

41 from django.core.management import call_command 

42 

43 test_database_name = self._get_test_db_name() 

44 

45 if verbosity >= 1: 45 ↛ 63line 45 didn't jump to line 63, because the condition on line 45 was never false

46 action = "Creating" 

47 if keepdb: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true

48 action = "Using existing" 

49 

50 self.log( 

51 "%s test database for alias %s..." 

52 % ( 

53 action, 

54 self._get_database_display_str(verbosity, test_database_name), 

55 ) 

56 ) 

57 

58 # We could skip this call if keepdb is True, but we instead 

59 # give it the keepdb param. This is to handle the case 

60 # where the test DB doesn't exist, in which case we need to 

61 # create it, then just not destroy it. If we instead skip 

62 # this, we will get an exception. 

63 self._create_test_db(verbosity, autoclobber, keepdb) 

64 

65 self.connection.close() 

66 settings.DATABASES[self.connection.alias]["NAME"] = test_database_name 

67 self.connection.settings_dict["NAME"] = test_database_name 

68 

69 try: 

70 if self.connection.settings_dict["TEST"]["MIGRATE"] is False: 70 ↛ 72line 70 didn't jump to line 72, because the condition on line 70 was never true

71 # Disable migrations for all apps. 

72 old_migration_modules = settings.MIGRATION_MODULES 

73 settings.MIGRATION_MODULES = { 

74 app.label: None for app in apps.get_app_configs() 

75 } 

76 # We report migrate messages at one level lower than that 

77 # requested. This ensures we don't get flooded with messages during 

78 # testing (unless you really ask to be flooded). 

79 call_command( 

80 "migrate", 

81 verbosity=max(verbosity - 1, 0), 

82 interactive=False, 

83 database=self.connection.alias, 

84 run_syncdb=True, 

85 ) 

86 finally: 

87 if self.connection.settings_dict["TEST"]["MIGRATE"] is False: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true

88 settings.MIGRATION_MODULES = old_migration_modules 

89 

90 # We then serialize the current state of the database into a string 

91 # and store it on the connection. This slightly horrific process is so people 

92 # who are testing on databases without transactions or who are using 

93 # a TransactionTestCase still get a clean database on every test run. 

94 if serialize: 94 ↛ 95line 94 didn't jump to line 95, because the condition on line 94 was never true

95 self.connection._test_serialized_contents = self.serialize_db_to_string() 

96 

97 call_command("createcachetable", database=self.connection.alias) 

98 

99 # Ensure a connection for the side effect of initializing the test database. 

100 self.connection.ensure_connection() 

101 

102 if os.environ.get("RUNNING_DJANGOS_TEST_SUITE") == "true": 102 ↛ 103line 102 didn't jump to line 103, because the condition on line 102 was never true

103 self.mark_expected_failures_and_skips() 

104 

105 return test_database_name 

106 

107 def set_as_test_mirror(self, primary_settings_dict): 

108 """ 

109 Set this database up to be used in testing as a mirror of a primary 

110 database whose settings are given. 

111 """ 

112 self.connection.settings_dict["NAME"] = primary_settings_dict["NAME"] 

113 

114 def serialize_db_to_string(self): 

115 """ 

116 Serialize all data in the database into a JSON string. 

117 Designed only for test runner usage; will not handle large 

118 amounts of data. 

119 """ 

120 # Iteratively return every object for all models to serialize. 

121 def get_objects(): 

122 from django.db.migrations.loader import MigrationLoader 

123 

124 loader = MigrationLoader(self.connection) 

125 for app_config in apps.get_app_configs(): 

126 if ( 

127 app_config.models_module is not None 

128 and app_config.label in loader.migrated_apps 

129 and app_config.name not in settings.TEST_NON_SERIALIZED_APPS 

130 ): 

131 for model in app_config.get_models(): 

132 if model._meta.can_migrate( 

133 self.connection 

134 ) and router.allow_migrate_model(self.connection.alias, model): 

135 queryset = model._base_manager.using( 

136 self.connection.alias, 

137 ).order_by(model._meta.pk.name) 

138 yield from queryset.iterator() 

139 

140 # Serialize to a string 

141 out = StringIO() 

142 serializers.serialize("json", get_objects(), indent=None, stream=out) 

143 return out.getvalue() 

144 

145 def deserialize_db_from_string(self, data): 

146 """ 

147 Reload the database with data from a string generated by 

148 the serialize_db_to_string() method. 

149 """ 

150 data = StringIO(data) 

151 table_names = set() 

152 # Load data in a transaction to handle forward references and cycles. 

153 with atomic(using=self.connection.alias): 

154 # Disable constraint checks, because some databases (MySQL) doesn't 

155 # support deferred checks. 

156 with self.connection.constraint_checks_disabled(): 

157 for obj in serializers.deserialize( 

158 "json", data, using=self.connection.alias 

159 ): 

160 obj.save() 

161 table_names.add(obj.object.__class__._meta.db_table) 

162 # Manually check for any invalid keys that might have been added, 

163 # because constraint checks were disabled. 

164 self.connection.check_constraints(table_names=table_names) 

165 

166 def _get_database_display_str(self, verbosity, database_name): 

167 """ 

168 Return display string for a database for use in various actions. 

169 """ 

170 return "'%s'%s" % ( 

171 self.connection.alias, 

172 (" ('%s')" % database_name) if verbosity >= 2 else "", 

173 ) 

174 

175 def _get_test_db_name(self): 

176 """ 

177 Internal implementation - return the name of the test DB that will be 

178 created. Only useful when called from create_test_db() and 

179 _create_test_db() and when no external munging is done with the 'NAME' 

180 settings. 

181 """ 

182 if self.connection.settings_dict["TEST"]["NAME"]: 182 ↛ 183line 182 didn't jump to line 183, because the condition on line 182 was never true

183 return self.connection.settings_dict["TEST"]["NAME"] 

184 return TEST_DATABASE_PREFIX + self.connection.settings_dict["NAME"] 

185 

186 def _execute_create_test_db(self, cursor, parameters, keepdb=False): 

187 cursor.execute("CREATE DATABASE %(dbname)s %(suffix)s" % parameters) 

188 

189 def _create_test_db(self, verbosity, autoclobber, keepdb=False): 

190 """ 

191 Internal implementation - create the test db tables. 

192 """ 

193 test_database_name = self._get_test_db_name() 

194 test_db_params = { 

195 "dbname": self.connection.ops.quote_name(test_database_name), 

196 "suffix": self.sql_table_creation_suffix(), 

197 } 

198 # Create the test database and connect to it. 

199 with self._nodb_cursor() as cursor: 

200 try: 

201 self._execute_create_test_db(cursor, test_db_params, keepdb) 

202 except Exception as e: 

203 # if we want to keep the db, then no need to do any of the below, 

204 # just return and skip it all. 

205 if keepdb: 

206 return test_database_name 

207 

208 self.log("Got an error creating the test database: %s" % e) 

209 if not autoclobber: 

210 confirm = input( 

211 "Type 'yes' if you would like to try deleting the test " 

212 "database '%s', or 'no' to cancel: " % test_database_name 

213 ) 

214 if autoclobber or confirm == "yes": 

215 try: 

216 if verbosity >= 1: 

217 self.log( 

218 "Destroying old test database for alias %s..." 

219 % ( 

220 self._get_database_display_str( 

221 verbosity, test_database_name 

222 ), 

223 ) 

224 ) 

225 cursor.execute("DROP DATABASE %(dbname)s" % test_db_params) 

226 self._execute_create_test_db(cursor, test_db_params, keepdb) 

227 except Exception as e: 

228 self.log("Got an error recreating the test database: %s" % e) 

229 sys.exit(2) 

230 else: 

231 self.log("Tests cancelled.") 

232 sys.exit(1) 

233 

234 return test_database_name 

235 

236 def clone_test_db(self, suffix, verbosity=1, autoclobber=False, keepdb=False): 

237 """ 

238 Clone a test database. 

239 """ 

240 source_database_name = self.connection.settings_dict["NAME"] 

241 

242 if verbosity >= 1: 

243 action = "Cloning test database" 

244 if keepdb: 

245 action = "Using existing clone" 

246 self.log( 

247 "%s for alias %s..." 

248 % ( 

249 action, 

250 self._get_database_display_str(verbosity, source_database_name), 

251 ) 

252 ) 

253 

254 # We could skip this call if keepdb is True, but we instead 

255 # give it the keepdb param. See create_test_db for details. 

256 self._clone_test_db(suffix, verbosity, keepdb) 

257 

258 def get_test_db_clone_settings(self, suffix): 

259 """ 

260 Return a modified connection settings dict for the n-th clone of a DB. 

261 """ 

262 # When this function is called, the test database has been created 

263 # already and its name has been copied to settings_dict['NAME'] so 

264 # we don't need to call _get_test_db_name. 

265 orig_settings_dict = self.connection.settings_dict 

266 return { 

267 **orig_settings_dict, 

268 "NAME": "{}_{}".format(orig_settings_dict["NAME"], suffix), 

269 } 

270 

271 def _clone_test_db(self, suffix, verbosity, keepdb=False): 

272 """ 

273 Internal implementation - duplicate the test db tables. 

274 """ 

275 raise NotImplementedError( 

276 "The database backend doesn't support cloning databases. " 

277 "Disable the option to run tests in parallel processes." 

278 ) 

279 

280 def destroy_test_db( 

281 self, old_database_name=None, verbosity=1, keepdb=False, suffix=None 

282 ): 

283 """ 

284 Destroy a test database, prompting the user for confirmation if the 

285 database already exists. 

286 """ 

287 self.connection.close() 

288 if suffix is None: 288 ↛ 291line 288 didn't jump to line 291, because the condition on line 288 was never false

289 test_database_name = self.connection.settings_dict["NAME"] 

290 else: 

291 test_database_name = self.get_test_db_clone_settings(suffix)["NAME"] 

292 

293 if verbosity >= 1: 293 ↛ 307line 293 didn't jump to line 307, because the condition on line 293 was never false

294 action = "Destroying" 

295 if keepdb: 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true

296 action = "Preserving" 

297 self.log( 

298 "%s test database for alias %s..." 

299 % ( 

300 action, 

301 self._get_database_display_str(verbosity, test_database_name), 

302 ) 

303 ) 

304 

305 # if we want to preserve the database 

306 # skip the actual destroying piece. 

307 if not keepdb: 307 ↛ 311line 307 didn't jump to line 311, because the condition on line 307 was never false

308 self._destroy_test_db(test_database_name, verbosity) 

309 

310 # Restore the original database name 

311 if old_database_name is not None: 311 ↛ exitline 311 didn't return from function 'destroy_test_db', because the condition on line 311 was never false

312 settings.DATABASES[self.connection.alias]["NAME"] = old_database_name 

313 self.connection.settings_dict["NAME"] = old_database_name 

314 

315 def _destroy_test_db(self, test_database_name, verbosity): 

316 """ 

317 Internal implementation - remove the test db tables. 

318 """ 

319 # Remove the test database to clean up after 

320 # ourselves. Connect to the previous database (not the test database) 

321 # to do so, because it's not allowed to delete a database while being 

322 # connected to it. 

323 with self._nodb_cursor() as cursor: 

324 cursor.execute( 

325 "DROP DATABASE %s" % self.connection.ops.quote_name(test_database_name) 

326 ) 

327 

328 def mark_expected_failures_and_skips(self): 

329 """ 

330 Mark tests in Django's test suite which are expected failures on this 

331 database and test which should be skipped on this database. 

332 """ 

333 for test_name in self.connection.features.django_test_expected_failures: 

334 test_case_name, _, test_method_name = test_name.rpartition(".") 

335 test_app = test_name.split(".")[0] 

336 # Importing a test app that isn't installed raises RuntimeError. 

337 if test_app in settings.INSTALLED_APPS: 

338 test_case = import_string(test_case_name) 

339 test_method = getattr(test_case, test_method_name) 

340 setattr(test_case, test_method_name, expectedFailure(test_method)) 

341 for reason, tests in self.connection.features.django_test_skips.items(): 

342 for test_name in tests: 

343 test_case_name, _, test_method_name = test_name.rpartition(".") 

344 test_app = test_name.split(".")[0] 

345 # Importing a test app that isn't installed raises RuntimeError. 

346 if test_app in settings.INSTALLED_APPS: 

347 test_case = import_string(test_case_name) 

348 test_method = getattr(test_case, test_method_name) 

349 setattr(test_case, test_method_name, skip(reason)(test_method)) 

350 

351 def sql_table_creation_suffix(self): 

352 """ 

353 SQL to append to the end of the test table creation statements. 

354 """ 

355 return "" 

356 

357 def test_db_signature(self): 

358 """ 

359 Return a tuple with elements of self.connection.settings_dict (a 

360 DATABASES setting value) that uniquely identify a database 

361 accordingly to the RDBMS particularities. 

362 """ 

363 settings_dict = self.connection.settings_dict 

364 return ( 

365 settings_dict["HOST"], 

366 settings_dict["PORT"], 

367 settings_dict["ENGINE"], 

368 self._get_test_db_name(), 

369 )