Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/base/creation.py: 44%
152 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import os
2import sys
3from io import StringIO
4from unittest import expectedFailure, skip
6from django.apps import apps
7from django.conf import settings
8from django.core import serializers
9from django.db import router
10from django.db.transaction import atomic
11from django.utils.module_loading import import_string
13# The prefix to put on the default database name when creating
14# the test database.
15TEST_DATABASE_PREFIX = "test_"
18class BaseDatabaseCreation:
19 """
20 Encapsulate backend-specific differences pertaining to creation and
21 destruction of the test database.
22 """
24 def __init__(self, connection):
25 self.connection = connection
27 def _nodb_cursor(self):
28 return self.connection._nodb_cursor()
30 def log(self, msg):
31 sys.stderr.write(msg + os.linesep)
33 def create_test_db(
34 self, verbosity=1, autoclobber=False, serialize=True, keepdb=False
35 ):
36 """
37 Create a test database, prompting the user for confirmation if the
38 database already exists. Return the name of the test database created.
39 """
40 # Don't import django.core.management if it isn't needed.
41 from django.core.management import call_command
43 test_database_name = self._get_test_db_name()
45 if verbosity >= 1: 45 ↛ 63line 45 didn't jump to line 63, because the condition on line 45 was never false
46 action = "Creating"
47 if keepdb: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true
48 action = "Using existing"
50 self.log(
51 "%s test database for alias %s..."
52 % (
53 action,
54 self._get_database_display_str(verbosity, test_database_name),
55 )
56 )
58 # We could skip this call if keepdb is True, but we instead
59 # give it the keepdb param. This is to handle the case
60 # where the test DB doesn't exist, in which case we need to
61 # create it, then just not destroy it. If we instead skip
62 # this, we will get an exception.
63 self._create_test_db(verbosity, autoclobber, keepdb)
65 self.connection.close()
66 settings.DATABASES[self.connection.alias]["NAME"] = test_database_name
67 self.connection.settings_dict["NAME"] = test_database_name
69 try:
70 if self.connection.settings_dict["TEST"]["MIGRATE"] is False: 70 ↛ 72line 70 didn't jump to line 72, because the condition on line 70 was never true
71 # Disable migrations for all apps.
72 old_migration_modules = settings.MIGRATION_MODULES
73 settings.MIGRATION_MODULES = {
74 app.label: None for app in apps.get_app_configs()
75 }
76 # We report migrate messages at one level lower than that
77 # requested. This ensures we don't get flooded with messages during
78 # testing (unless you really ask to be flooded).
79 call_command(
80 "migrate",
81 verbosity=max(verbosity - 1, 0),
82 interactive=False,
83 database=self.connection.alias,
84 run_syncdb=True,
85 )
86 finally:
87 if self.connection.settings_dict["TEST"]["MIGRATE"] is False: 87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true
88 settings.MIGRATION_MODULES = old_migration_modules
90 # We then serialize the current state of the database into a string
91 # and store it on the connection. This slightly horrific process is so people
92 # who are testing on databases without transactions or who are using
93 # a TransactionTestCase still get a clean database on every test run.
94 if serialize: 94 ↛ 95line 94 didn't jump to line 95, because the condition on line 94 was never true
95 self.connection._test_serialized_contents = self.serialize_db_to_string()
97 call_command("createcachetable", database=self.connection.alias)
99 # Ensure a connection for the side effect of initializing the test database.
100 self.connection.ensure_connection()
102 if os.environ.get("RUNNING_DJANGOS_TEST_SUITE") == "true": 102 ↛ 103line 102 didn't jump to line 103, because the condition on line 102 was never true
103 self.mark_expected_failures_and_skips()
105 return test_database_name
107 def set_as_test_mirror(self, primary_settings_dict):
108 """
109 Set this database up to be used in testing as a mirror of a primary
110 database whose settings are given.
111 """
112 self.connection.settings_dict["NAME"] = primary_settings_dict["NAME"]
114 def serialize_db_to_string(self):
115 """
116 Serialize all data in the database into a JSON string.
117 Designed only for test runner usage; will not handle large
118 amounts of data.
119 """
120 # Iteratively return every object for all models to serialize.
121 def get_objects():
122 from django.db.migrations.loader import MigrationLoader
124 loader = MigrationLoader(self.connection)
125 for app_config in apps.get_app_configs():
126 if (
127 app_config.models_module is not None
128 and app_config.label in loader.migrated_apps
129 and app_config.name not in settings.TEST_NON_SERIALIZED_APPS
130 ):
131 for model in app_config.get_models():
132 if model._meta.can_migrate(
133 self.connection
134 ) and router.allow_migrate_model(self.connection.alias, model):
135 queryset = model._base_manager.using(
136 self.connection.alias,
137 ).order_by(model._meta.pk.name)
138 yield from queryset.iterator()
140 # Serialize to a string
141 out = StringIO()
142 serializers.serialize("json", get_objects(), indent=None, stream=out)
143 return out.getvalue()
145 def deserialize_db_from_string(self, data):
146 """
147 Reload the database with data from a string generated by
148 the serialize_db_to_string() method.
149 """
150 data = StringIO(data)
151 table_names = set()
152 # Load data in a transaction to handle forward references and cycles.
153 with atomic(using=self.connection.alias):
154 # Disable constraint checks, because some databases (MySQL) doesn't
155 # support deferred checks.
156 with self.connection.constraint_checks_disabled():
157 for obj in serializers.deserialize(
158 "json", data, using=self.connection.alias
159 ):
160 obj.save()
161 table_names.add(obj.object.__class__._meta.db_table)
162 # Manually check for any invalid keys that might have been added,
163 # because constraint checks were disabled.
164 self.connection.check_constraints(table_names=table_names)
166 def _get_database_display_str(self, verbosity, database_name):
167 """
168 Return display string for a database for use in various actions.
169 """
170 return "'%s'%s" % (
171 self.connection.alias,
172 (" ('%s')" % database_name) if verbosity >= 2 else "",
173 )
175 def _get_test_db_name(self):
176 """
177 Internal implementation - return the name of the test DB that will be
178 created. Only useful when called from create_test_db() and
179 _create_test_db() and when no external munging is done with the 'NAME'
180 settings.
181 """
182 if self.connection.settings_dict["TEST"]["NAME"]: 182 ↛ 183line 182 didn't jump to line 183, because the condition on line 182 was never true
183 return self.connection.settings_dict["TEST"]["NAME"]
184 return TEST_DATABASE_PREFIX + self.connection.settings_dict["NAME"]
186 def _execute_create_test_db(self, cursor, parameters, keepdb=False):
187 cursor.execute("CREATE DATABASE %(dbname)s %(suffix)s" % parameters)
189 def _create_test_db(self, verbosity, autoclobber, keepdb=False):
190 """
191 Internal implementation - create the test db tables.
192 """
193 test_database_name = self._get_test_db_name()
194 test_db_params = {
195 "dbname": self.connection.ops.quote_name(test_database_name),
196 "suffix": self.sql_table_creation_suffix(),
197 }
198 # Create the test database and connect to it.
199 with self._nodb_cursor() as cursor:
200 try:
201 self._execute_create_test_db(cursor, test_db_params, keepdb)
202 except Exception as e:
203 # if we want to keep the db, then no need to do any of the below,
204 # just return and skip it all.
205 if keepdb:
206 return test_database_name
208 self.log("Got an error creating the test database: %s" % e)
209 if not autoclobber:
210 confirm = input(
211 "Type 'yes' if you would like to try deleting the test "
212 "database '%s', or 'no' to cancel: " % test_database_name
213 )
214 if autoclobber or confirm == "yes":
215 try:
216 if verbosity >= 1:
217 self.log(
218 "Destroying old test database for alias %s..."
219 % (
220 self._get_database_display_str(
221 verbosity, test_database_name
222 ),
223 )
224 )
225 cursor.execute("DROP DATABASE %(dbname)s" % test_db_params)
226 self._execute_create_test_db(cursor, test_db_params, keepdb)
227 except Exception as e:
228 self.log("Got an error recreating the test database: %s" % e)
229 sys.exit(2)
230 else:
231 self.log("Tests cancelled.")
232 sys.exit(1)
234 return test_database_name
236 def clone_test_db(self, suffix, verbosity=1, autoclobber=False, keepdb=False):
237 """
238 Clone a test database.
239 """
240 source_database_name = self.connection.settings_dict["NAME"]
242 if verbosity >= 1:
243 action = "Cloning test database"
244 if keepdb:
245 action = "Using existing clone"
246 self.log(
247 "%s for alias %s..."
248 % (
249 action,
250 self._get_database_display_str(verbosity, source_database_name),
251 )
252 )
254 # We could skip this call if keepdb is True, but we instead
255 # give it the keepdb param. See create_test_db for details.
256 self._clone_test_db(suffix, verbosity, keepdb)
258 def get_test_db_clone_settings(self, suffix):
259 """
260 Return a modified connection settings dict for the n-th clone of a DB.
261 """
262 # When this function is called, the test database has been created
263 # already and its name has been copied to settings_dict['NAME'] so
264 # we don't need to call _get_test_db_name.
265 orig_settings_dict = self.connection.settings_dict
266 return {
267 **orig_settings_dict,
268 "NAME": "{}_{}".format(orig_settings_dict["NAME"], suffix),
269 }
271 def _clone_test_db(self, suffix, verbosity, keepdb=False):
272 """
273 Internal implementation - duplicate the test db tables.
274 """
275 raise NotImplementedError(
276 "The database backend doesn't support cloning databases. "
277 "Disable the option to run tests in parallel processes."
278 )
280 def destroy_test_db(
281 self, old_database_name=None, verbosity=1, keepdb=False, suffix=None
282 ):
283 """
284 Destroy a test database, prompting the user for confirmation if the
285 database already exists.
286 """
287 self.connection.close()
288 if suffix is None: 288 ↛ 291line 288 didn't jump to line 291, because the condition on line 288 was never false
289 test_database_name = self.connection.settings_dict["NAME"]
290 else:
291 test_database_name = self.get_test_db_clone_settings(suffix)["NAME"]
293 if verbosity >= 1: 293 ↛ 307line 293 didn't jump to line 307, because the condition on line 293 was never false
294 action = "Destroying"
295 if keepdb: 295 ↛ 296line 295 didn't jump to line 296, because the condition on line 295 was never true
296 action = "Preserving"
297 self.log(
298 "%s test database for alias %s..."
299 % (
300 action,
301 self._get_database_display_str(verbosity, test_database_name),
302 )
303 )
305 # if we want to preserve the database
306 # skip the actual destroying piece.
307 if not keepdb: 307 ↛ 311line 307 didn't jump to line 311, because the condition on line 307 was never false
308 self._destroy_test_db(test_database_name, verbosity)
310 # Restore the original database name
311 if old_database_name is not None: 311 ↛ exitline 311 didn't return from function 'destroy_test_db', because the condition on line 311 was never false
312 settings.DATABASES[self.connection.alias]["NAME"] = old_database_name
313 self.connection.settings_dict["NAME"] = old_database_name
315 def _destroy_test_db(self, test_database_name, verbosity):
316 """
317 Internal implementation - remove the test db tables.
318 """
319 # Remove the test database to clean up after
320 # ourselves. Connect to the previous database (not the test database)
321 # to do so, because it's not allowed to delete a database while being
322 # connected to it.
323 with self._nodb_cursor() as cursor:
324 cursor.execute(
325 "DROP DATABASE %s" % self.connection.ops.quote_name(test_database_name)
326 )
328 def mark_expected_failures_and_skips(self):
329 """
330 Mark tests in Django's test suite which are expected failures on this
331 database and test which should be skipped on this database.
332 """
333 for test_name in self.connection.features.django_test_expected_failures:
334 test_case_name, _, test_method_name = test_name.rpartition(".")
335 test_app = test_name.split(".")[0]
336 # Importing a test app that isn't installed raises RuntimeError.
337 if test_app in settings.INSTALLED_APPS:
338 test_case = import_string(test_case_name)
339 test_method = getattr(test_case, test_method_name)
340 setattr(test_case, test_method_name, expectedFailure(test_method))
341 for reason, tests in self.connection.features.django_test_skips.items():
342 for test_name in tests:
343 test_case_name, _, test_method_name = test_name.rpartition(".")
344 test_app = test_name.split(".")[0]
345 # Importing a test app that isn't installed raises RuntimeError.
346 if test_app in settings.INSTALLED_APPS:
347 test_case = import_string(test_case_name)
348 test_method = getattr(test_case, test_method_name)
349 setattr(test_case, test_method_name, skip(reason)(test_method))
351 def sql_table_creation_suffix(self):
352 """
353 SQL to append to the end of the test table creation statements.
354 """
355 return ""
357 def test_db_signature(self):
358 """
359 Return a tuple with elements of self.connection.settings_dict (a
360 DATABASES setting value) that uniquely identify a database
361 accordingly to the RDBMS particularities.
362 """
363 settings_dict = self.connection.settings_dict
364 return (
365 settings_dict["HOST"],
366 settings_dict["PORT"],
367 settings_dict["ENGINE"],
368 self._get_test_db_name(),
369 )