Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/test/runner.py: 47%
571 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import argparse
2import ctypes
3import faulthandler
4import hashlib
5import io
6import itertools
7import logging
8import multiprocessing
9import os
10import pickle
11import random
12import sys
13import textwrap
14import unittest
15import warnings
16from collections import defaultdict
17from contextlib import contextmanager
18from importlib import import_module
19from io import StringIO
21from django.core.management import call_command
22from django.db import connections
23from django.test import SimpleTestCase, TestCase
24from django.test.utils import NullTimeKeeper, TimeKeeper, iter_test_cases
25from django.test.utils import setup_databases as _setup_databases
26from django.test.utils import setup_test_environment
27from django.test.utils import teardown_databases as _teardown_databases
28from django.test.utils import teardown_test_environment
29from django.utils.datastructures import OrderedSet
30from django.utils.deprecation import RemovedInDjango50Warning
32try:
33 import ipdb as pdb
34except ImportError:
35 import pdb
37try:
38 import tblib.pickling_support
39except ImportError:
40 tblib = None
43class DebugSQLTextTestResult(unittest.TextTestResult):
44 def __init__(self, stream, descriptions, verbosity):
45 self.logger = logging.getLogger("django.db.backends")
46 self.logger.setLevel(logging.DEBUG)
47 self.debug_sql_stream = None
48 super().__init__(stream, descriptions, verbosity)
50 def startTest(self, test):
51 self.debug_sql_stream = StringIO()
52 self.handler = logging.StreamHandler(self.debug_sql_stream)
53 self.logger.addHandler(self.handler)
54 super().startTest(test)
56 def stopTest(self, test):
57 super().stopTest(test)
58 self.logger.removeHandler(self.handler)
59 if self.showAll:
60 self.debug_sql_stream.seek(0)
61 self.stream.write(self.debug_sql_stream.read())
62 self.stream.writeln(self.separator2)
64 def addError(self, test, err):
65 super().addError(test, err)
66 if self.debug_sql_stream is None:
67 # Error before tests e.g. in setUpTestData().
68 sql = ""
69 else:
70 self.debug_sql_stream.seek(0)
71 sql = self.debug_sql_stream.read()
72 self.errors[-1] = self.errors[-1] + (sql,)
74 def addFailure(self, test, err):
75 super().addFailure(test, err)
76 self.debug_sql_stream.seek(0)
77 self.failures[-1] = self.failures[-1] + (self.debug_sql_stream.read(),)
79 def addSubTest(self, test, subtest, err):
80 super().addSubTest(test, subtest, err)
81 if err is not None:
82 self.debug_sql_stream.seek(0)
83 errors = (
84 self.failures
85 if issubclass(err[0], test.failureException)
86 else self.errors
87 )
88 errors[-1] = errors[-1] + (self.debug_sql_stream.read(),)
90 def printErrorList(self, flavour, errors):
91 for test, err, sql_debug in errors:
92 self.stream.writeln(self.separator1)
93 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
94 self.stream.writeln(self.separator2)
95 self.stream.writeln(err)
96 self.stream.writeln(self.separator2)
97 self.stream.writeln(sql_debug)
100class PDBDebugResult(unittest.TextTestResult):
101 """
102 Custom result class that triggers a PDB session when an error or failure
103 occurs.
104 """
106 def addError(self, test, err):
107 super().addError(test, err)
108 self.debug(err)
110 def addFailure(self, test, err):
111 super().addFailure(test, err)
112 self.debug(err)
114 def addSubTest(self, test, subtest, err):
115 if err is not None:
116 self.debug(err)
117 super().addSubTest(test, subtest, err)
119 def debug(self, error):
120 self._restoreStdout()
121 self.buffer = False
122 exc_type, exc_value, traceback = error
123 print("\nOpening PDB: %r" % exc_value)
124 pdb.post_mortem(traceback)
127class DummyList:
128 """
129 Dummy list class for faking storage of results in unittest.TestResult.
130 """
132 __slots__ = ()
134 def append(self, item):
135 pass
138class RemoteTestResult(unittest.TestResult):
139 """
140 Extend unittest.TestResult to record events in the child processes so they
141 can be replayed in the parent process. Events include things like which
142 tests succeeded or failed.
143 """
145 def __init__(self, *args, **kwargs):
146 super().__init__(*args, **kwargs)
147 # Fake storage of results to reduce memory usage. These are used by the
148 # unittest default methods, but here 'events' is used instead.
149 dummy_list = DummyList()
150 self.failures = dummy_list
151 self.errors = dummy_list
152 self.skipped = dummy_list
153 self.expectedFailures = dummy_list
154 self.unexpectedSuccesses = dummy_list
156 if tblib is not None:
157 tblib.pickling_support.install()
158 self.events = []
160 def __getstate__(self):
161 # Make this class picklable by removing the file-like buffer
162 # attributes. This is possible since they aren't used after unpickling
163 # after being sent to ParallelTestSuite.
164 state = self.__dict__.copy()
165 state.pop("_stdout_buffer", None)
166 state.pop("_stderr_buffer", None)
167 state.pop("_original_stdout", None)
168 state.pop("_original_stderr", None)
169 return state
171 @property
172 def test_index(self):
173 return self.testsRun - 1
175 def _confirm_picklable(self, obj):
176 """
177 Confirm that obj can be pickled and unpickled as multiprocessing will
178 need to pickle the exception in the child process and unpickle it in
179 the parent process. Let the exception rise, if not.
180 """
181 pickle.loads(pickle.dumps(obj))
183 def _print_unpicklable_subtest(self, test, subtest, pickle_exc):
184 print(
185 """
186Subtest failed:
188 test: {}
189 subtest: {}
191Unfortunately, the subtest that failed cannot be pickled, so the parallel
192test runner cannot handle it cleanly. Here is the pickling error:
194> {}
196You should re-run this test with --parallel=1 to reproduce the failure
197with a cleaner failure message.
198""".format(
199 test, subtest, pickle_exc
200 )
201 )
203 def check_picklable(self, test, err):
204 # Ensure that sys.exc_info() tuples are picklable. This displays a
205 # clear multiprocessing.pool.RemoteTraceback generated in the child
206 # process instead of a multiprocessing.pool.MaybeEncodingError, making
207 # the root cause easier to figure out for users who aren't familiar
208 # with the multiprocessing module. Since we're in a forked process,
209 # our best chance to communicate with them is to print to stdout.
210 try:
211 self._confirm_picklable(err)
212 except Exception as exc:
213 original_exc_txt = repr(err[1])
214 original_exc_txt = textwrap.fill(
215 original_exc_txt, 75, initial_indent=" ", subsequent_indent=" "
216 )
217 pickle_exc_txt = repr(exc)
218 pickle_exc_txt = textwrap.fill(
219 pickle_exc_txt, 75, initial_indent=" ", subsequent_indent=" "
220 )
221 if tblib is None:
222 print(
223 """
225{} failed:
227{}
229Unfortunately, tracebacks cannot be pickled, making it impossible for the
230parallel test runner to handle this exception cleanly.
232In order to see the traceback, you should install tblib:
234 python -m pip install tblib
235""".format(
236 test, original_exc_txt
237 )
238 )
239 else:
240 print(
241 """
243{} failed:
245{}
247Unfortunately, the exception it raised cannot be pickled, making it impossible
248for the parallel test runner to handle it cleanly.
250Here's the error encountered while trying to pickle the exception:
252{}
254You should re-run this test with the --parallel=1 option to reproduce the
255failure and get a correct traceback.
256""".format(
257 test, original_exc_txt, pickle_exc_txt
258 )
259 )
260 raise
262 def check_subtest_picklable(self, test, subtest):
263 try:
264 self._confirm_picklable(subtest)
265 except Exception as exc:
266 self._print_unpicklable_subtest(test, subtest, exc)
267 raise
269 def startTestRun(self):
270 super().startTestRun()
271 self.events.append(("startTestRun",))
273 def stopTestRun(self):
274 super().stopTestRun()
275 self.events.append(("stopTestRun",))
277 def startTest(self, test):
278 super().startTest(test)
279 self.events.append(("startTest", self.test_index))
281 def stopTest(self, test):
282 super().stopTest(test)
283 self.events.append(("stopTest", self.test_index))
285 def addError(self, test, err):
286 self.check_picklable(test, err)
287 self.events.append(("addError", self.test_index, err))
288 super().addError(test, err)
290 def addFailure(self, test, err):
291 self.check_picklable(test, err)
292 self.events.append(("addFailure", self.test_index, err))
293 super().addFailure(test, err)
295 def addSubTest(self, test, subtest, err):
296 # Follow Python's implementation of unittest.TestResult.addSubTest() by
297 # not doing anything when a subtest is successful.
298 if err is not None:
299 # Call check_picklable() before check_subtest_picklable() since
300 # check_picklable() performs the tblib check.
301 self.check_picklable(test, err)
302 self.check_subtest_picklable(test, subtest)
303 self.events.append(("addSubTest", self.test_index, subtest, err))
304 super().addSubTest(test, subtest, err)
306 def addSuccess(self, test):
307 self.events.append(("addSuccess", self.test_index))
308 super().addSuccess(test)
310 def addSkip(self, test, reason):
311 self.events.append(("addSkip", self.test_index, reason))
312 super().addSkip(test, reason)
314 def addExpectedFailure(self, test, err):
315 # If tblib isn't installed, pickling the traceback will always fail.
316 # However we don't want tblib to be required for running the tests
317 # when they pass or fail as expected. Drop the traceback when an
318 # expected failure occurs.
319 if tblib is None:
320 err = err[0], err[1], None
321 self.check_picklable(test, err)
322 self.events.append(("addExpectedFailure", self.test_index, err))
323 super().addExpectedFailure(test, err)
325 def addUnexpectedSuccess(self, test):
326 self.events.append(("addUnexpectedSuccess", self.test_index))
327 super().addUnexpectedSuccess(test)
329 def wasSuccessful(self):
330 """Tells whether or not this result was a success."""
331 failure_types = {"addError", "addFailure", "addSubTest", "addUnexpectedSuccess"}
332 return all(e[0] not in failure_types for e in self.events)
334 def _exc_info_to_string(self, err, test):
335 # Make this method no-op. It only powers the default unittest behavior
336 # for recording errors, but this class pickles errors into 'events'
337 # instead.
338 return ""
341class RemoteTestRunner:
342 """
343 Run tests and record everything but don't display anything.
345 The implementation matches the unpythonic coding style of unittest2.
346 """
348 resultclass = RemoteTestResult
350 def __init__(self, failfast=False, resultclass=None, buffer=False):
351 self.failfast = failfast
352 self.buffer = buffer
353 if resultclass is not None:
354 self.resultclass = resultclass
356 def run(self, test):
357 result = self.resultclass()
358 unittest.registerResult(result)
359 result.failfast = self.failfast
360 result.buffer = self.buffer
361 test(result)
362 return result
365def get_max_test_processes():
366 """
367 The maximum number of test processes when using the --parallel option.
368 """
369 # The current implementation of the parallel test runner requires
370 # multiprocessing to start subprocesses with fork().
371 if multiprocessing.get_start_method() != "fork":
372 return 1
373 try:
374 return int(os.environ["DJANGO_TEST_PROCESSES"])
375 except KeyError:
376 return multiprocessing.cpu_count()
379def parallel_type(value):
380 """Parse value passed to the --parallel option."""
381 if value == "auto":
382 return value
383 try:
384 return int(value)
385 except ValueError:
386 raise argparse.ArgumentTypeError(
387 f"{value!r} is not an integer or the string 'auto'"
388 )
391_worker_id = 0
394def _init_worker(counter):
395 """
396 Switch to databases dedicated to this worker.
398 This helper lives at module-level because of the multiprocessing module's
399 requirements.
400 """
402 global _worker_id
404 with counter.get_lock():
405 counter.value += 1
406 _worker_id = counter.value
408 for alias in connections:
409 connection = connections[alias]
410 settings_dict = connection.creation.get_test_db_clone_settings(str(_worker_id))
411 # connection.settings_dict must be updated in place for changes to be
412 # reflected in django.db.connections. If the following line assigned
413 # connection.settings_dict = settings_dict, new threads would connect
414 # to the default database instead of the appropriate clone.
415 connection.settings_dict.update(settings_dict)
416 connection.close()
419def _run_subsuite(args):
420 """
421 Run a suite of tests with a RemoteTestRunner and return a RemoteTestResult.
423 This helper lives at module-level and its arguments are wrapped in a tuple
424 because of the multiprocessing module's requirements.
425 """
426 runner_class, subsuite_index, subsuite, failfast, buffer = args
427 runner = runner_class(failfast=failfast, buffer=buffer)
428 result = runner.run(subsuite)
429 return subsuite_index, result.events
432class ParallelTestSuite(unittest.TestSuite):
433 """
434 Run a series of tests in parallel in several processes.
436 While the unittest module's documentation implies that orchestrating the
437 execution of tests is the responsibility of the test runner, in practice,
438 it appears that TestRunner classes are more concerned with formatting and
439 displaying test results.
441 Since there are fewer use cases for customizing TestSuite than TestRunner,
442 implementing parallelization at the level of the TestSuite improves
443 interoperability with existing custom test runners. A single instance of a
444 test runner can still collect results from all tests without being aware
445 that they have been run in parallel.
446 """
448 # In case someone wants to modify these in a subclass.
449 init_worker = _init_worker
450 run_subsuite = _run_subsuite
451 runner_class = RemoteTestRunner
453 def __init__(self, subsuites, processes, failfast=False, buffer=False):
454 self.subsuites = subsuites
455 self.processes = processes
456 self.failfast = failfast
457 self.buffer = buffer
458 super().__init__()
460 def run(self, result):
461 """
462 Distribute test cases across workers.
464 Return an identifier of each test case with its result in order to use
465 imap_unordered to show results as soon as they're available.
467 To minimize pickling errors when getting results from workers:
469 - pass back numeric indexes in self.subsuites instead of tests
470 - make tracebacks picklable with tblib, if available
472 Even with tblib, errors may still occur for dynamically created
473 exception classes which cannot be unpickled.
474 """
475 counter = multiprocessing.Value(ctypes.c_int, 0)
476 pool = multiprocessing.Pool(
477 processes=self.processes,
478 initializer=self.init_worker.__func__,
479 initargs=[counter],
480 )
481 args = [
482 (self.runner_class, index, subsuite, self.failfast, self.buffer)
483 for index, subsuite in enumerate(self.subsuites)
484 ]
485 test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
487 while True:
488 if result.shouldStop:
489 pool.terminate()
490 break
492 try:
493 subsuite_index, events = test_results.next(timeout=0.1)
494 except multiprocessing.TimeoutError:
495 continue
496 except StopIteration:
497 pool.close()
498 break
500 tests = list(self.subsuites[subsuite_index])
501 for event in events:
502 event_name = event[0]
503 handler = getattr(result, event_name, None)
504 if handler is None:
505 continue
506 test = tests[event[1]]
507 args = event[2:]
508 handler(test, *args)
510 pool.join()
512 return result
514 def __iter__(self):
515 return iter(self.subsuites)
518class Shuffler:
519 """
520 This class implements shuffling with a special consistency property.
521 Consistency means that, for a given seed and key function, if two sets of
522 items are shuffled, the resulting order will agree on the intersection of
523 the two sets. For example, if items are removed from an original set, the
524 shuffled order for the new set will be the shuffled order of the original
525 set restricted to the smaller set.
526 """
528 # This doesn't need to be cryptographically strong, so use what's fastest.
529 hash_algorithm = "md5"
531 @classmethod
532 def _hash_text(cls, text):
533 h = hashlib.new(cls.hash_algorithm)
534 h.update(text.encode("utf-8"))
535 return h.hexdigest()
537 def __init__(self, seed=None):
538 if seed is None:
539 # Limit seeds to 10 digits for simpler output.
540 seed = random.randint(0, 10**10 - 1)
541 seed_source = "generated"
542 else:
543 seed_source = "given"
544 self.seed = seed
545 self.seed_source = seed_source
547 @property
548 def seed_display(self):
549 return f"{self.seed!r} ({self.seed_source})"
551 def _hash_item(self, item, key):
552 text = "{}{}".format(self.seed, key(item))
553 return self._hash_text(text)
555 def shuffle(self, items, key):
556 """
557 Return a new list of the items in a shuffled order.
559 The `key` is a function that accepts an item in `items` and returns
560 a string unique for that item that can be viewed as a string id. The
561 order of the return value is deterministic. It depends on the seed
562 and key function but not on the original order.
563 """
564 hashes = {}
565 for item in items:
566 hashed = self._hash_item(item, key)
567 if hashed in hashes:
568 msg = "item {!r} has same hash {!r} as item {!r}".format(
569 item,
570 hashed,
571 hashes[hashed],
572 )
573 raise RuntimeError(msg)
574 hashes[hashed] = item
575 return [hashes[hashed] for hashed in sorted(hashes)]
578class DiscoverRunner:
579 """A Django test runner that uses unittest2 test discovery."""
581 test_suite = unittest.TestSuite
582 parallel_test_suite = ParallelTestSuite
583 test_runner = unittest.TextTestRunner
584 test_loader = unittest.defaultTestLoader
585 reorder_by = (TestCase, SimpleTestCase)
587 def __init__(
588 self,
589 pattern=None,
590 top_level=None,
591 verbosity=1,
592 interactive=True,
593 failfast=False,
594 keepdb=False,
595 reverse=False,
596 debug_mode=False,
597 debug_sql=False,
598 parallel=0,
599 tags=None,
600 exclude_tags=None,
601 test_name_patterns=None,
602 pdb=False,
603 buffer=False,
604 enable_faulthandler=True,
605 timing=False,
606 shuffle=False,
607 logger=None,
608 **kwargs,
609 ):
611 self.pattern = pattern
612 self.top_level = top_level
613 self.verbosity = verbosity
614 self.interactive = interactive
615 self.failfast = failfast
616 self.keepdb = keepdb
617 self.reverse = reverse
618 self.debug_mode = debug_mode
619 self.debug_sql = debug_sql
620 self.parallel = parallel
621 self.tags = set(tags or [])
622 self.exclude_tags = set(exclude_tags or [])
623 if not faulthandler.is_enabled() and enable_faulthandler: 623 ↛ 628line 623 didn't jump to line 628, because the condition on line 623 was never false
624 try:
625 faulthandler.enable(file=sys.stderr.fileno())
626 except (AttributeError, io.UnsupportedOperation):
627 faulthandler.enable(file=sys.__stderr__.fileno())
628 self.pdb = pdb
629 if self.pdb and self.parallel > 1: 629 ↛ 630line 629 didn't jump to line 630, because the condition on line 629 was never true
630 raise ValueError(
631 "You cannot use --pdb with parallel tests; pass --parallel=1 to use it."
632 )
633 self.buffer = buffer
634 self.test_name_patterns = None
635 self.time_keeper = TimeKeeper() if timing else NullTimeKeeper()
636 if test_name_patterns: 636 ↛ 639line 636 didn't jump to line 639, because the condition on line 636 was never true
637 # unittest does not export the _convert_select_pattern function
638 # that converts command-line arguments to patterns.
639 self.test_name_patterns = {
640 pattern if "*" in pattern else "*%s*" % pattern
641 for pattern in test_name_patterns
642 }
643 self.shuffle = shuffle
644 self._shuffler = None
645 self.logger = logger
647 @classmethod
648 def add_arguments(cls, parser):
649 parser.add_argument(
650 "-t",
651 "--top-level-directory",
652 dest="top_level",
653 help="Top level of project for unittest discovery.",
654 )
655 parser.add_argument(
656 "-p",
657 "--pattern",
658 default="test*.py",
659 help="The test matching pattern. Defaults to test*.py.",
660 )
661 parser.add_argument(
662 "--keepdb", action="store_true", help="Preserves the test DB between runs."
663 )
664 parser.add_argument(
665 "--shuffle",
666 nargs="?",
667 default=False,
668 type=int,
669 metavar="SEED",
670 help="Shuffles test case order.",
671 )
672 parser.add_argument(
673 "-r",
674 "--reverse",
675 action="store_true",
676 help="Reverses test case order.",
677 )
678 parser.add_argument(
679 "--debug-mode",
680 action="store_true",
681 help="Sets settings.DEBUG to True.",
682 )
683 parser.add_argument(
684 "-d",
685 "--debug-sql",
686 action="store_true",
687 help="Prints logged SQL queries on failure.",
688 )
689 parser.add_argument(
690 "--parallel",
691 nargs="?",
692 const="auto",
693 default=0,
694 type=parallel_type,
695 metavar="N",
696 help=(
697 "Run tests using up to N parallel processes. Use the value "
698 '"auto" to run one test process for each processor core.'
699 ),
700 )
701 parser.add_argument(
702 "--tag",
703 action="append",
704 dest="tags",
705 help="Run only tests with the specified tag. Can be used multiple times.",
706 )
707 parser.add_argument(
708 "--exclude-tag",
709 action="append",
710 dest="exclude_tags",
711 help="Do not run tests with the specified tag. Can be used multiple times.",
712 )
713 parser.add_argument(
714 "--pdb",
715 action="store_true",
716 help="Runs a debugger (pdb, or ipdb if installed) on error or failure.",
717 )
718 parser.add_argument(
719 "-b",
720 "--buffer",
721 action="store_true",
722 help="Discard output from passing tests.",
723 )
724 parser.add_argument(
725 "--no-faulthandler",
726 action="store_false",
727 dest="enable_faulthandler",
728 help="Disables the Python faulthandler module during tests.",
729 )
730 parser.add_argument(
731 "--timing",
732 action="store_true",
733 help=("Output timings, including database set up and total run time."),
734 )
735 parser.add_argument(
736 "-k",
737 action="append",
738 dest="test_name_patterns",
739 help=(
740 "Only run test methods and classes that match the pattern "
741 "or substring. Can be used multiple times. Same as "
742 "unittest -k option."
743 ),
744 )
746 @property
747 def shuffle_seed(self):
748 if self._shuffler is None:
749 return None
750 return self._shuffler.seed
752 def log(self, msg, level=None):
753 """
754 Log the message at the given logging level (the default is INFO).
756 If a logger isn't set, the message is instead printed to the console,
757 respecting the configured verbosity. A verbosity of 0 prints no output,
758 a verbosity of 1 prints INFO and above, and a verbosity of 2 or higher
759 prints all levels.
760 """
761 if level is None: 761 ↛ 763line 761 didn't jump to line 763, because the condition on line 761 was never false
762 level = logging.INFO
763 if self.logger is None: 763 ↛ 768line 763 didn't jump to line 768, because the condition on line 763 was never false
764 if self.verbosity <= 0 or (self.verbosity == 1 and level < logging.INFO): 764 ↛ 765line 764 didn't jump to line 765, because the condition on line 764 was never true
765 return
766 print(msg)
767 else:
768 self.logger.log(level, msg)
770 def setup_test_environment(self, **kwargs):
771 setup_test_environment(debug=self.debug_mode)
772 unittest.installHandler()
774 def setup_shuffler(self):
775 if self.shuffle is False: 775 ↛ 777line 775 didn't jump to line 777, because the condition on line 775 was never false
776 return
777 shuffler = Shuffler(seed=self.shuffle)
778 self.log(f"Using shuffle seed: {shuffler.seed_display}")
779 self._shuffler = shuffler
781 @contextmanager
782 def load_with_patterns(self):
783 original_test_name_patterns = self.test_loader.testNamePatterns
784 self.test_loader.testNamePatterns = self.test_name_patterns
785 try:
786 yield
787 finally:
788 # Restore the original patterns.
789 self.test_loader.testNamePatterns = original_test_name_patterns
791 def load_tests_for_label(self, label, discover_kwargs):
792 label_as_path = os.path.abspath(label)
793 tests = None
795 # If a module, or "module.ClassName[.method_name]", just run those.
796 if not os.path.exists(label_as_path): 796 ↛ 797line 796 didn't jump to line 797, because the condition on line 796 was never true
797 with self.load_with_patterns():
798 tests = self.test_loader.loadTestsFromName(label)
799 if tests.countTestCases():
800 return tests
801 # Try discovery if "label" is a package or directory.
802 is_importable, is_package = try_importing(label)
803 if is_importable: 803 ↛ 804line 803 didn't jump to line 804, because the condition on line 803 was never true
804 if not is_package:
805 return tests
806 elif not os.path.isdir(label_as_path): 806 ↛ 807line 806 didn't jump to line 807, because the condition on line 806 was never true
807 if os.path.exists(label_as_path):
808 assert tests is None
809 raise RuntimeError(
810 f"One of the test labels is a path to a file: {label!r}, "
811 f"which is not supported. Use a dotted module name or "
812 f"path to a directory instead."
813 )
814 return tests
816 kwargs = discover_kwargs.copy()
817 if os.path.isdir(label_as_path) and not self.top_level: 817 ↛ 820line 817 didn't jump to line 820, because the condition on line 817 was never false
818 kwargs["top_level_dir"] = find_top_level(label_as_path)
820 with self.load_with_patterns():
821 tests = self.test_loader.discover(start_dir=label, **kwargs)
823 # Make unittest forget the top-level dir it calculated from this run,
824 # to support running tests from two different top-levels.
825 self.test_loader._top_level_dir = None
826 return tests
828 def build_suite(self, test_labels=None, extra_tests=None, **kwargs):
829 if extra_tests is not None: 829 ↛ 830line 829 didn't jump to line 830, because the condition on line 829 was never true
830 warnings.warn(
831 "The extra_tests argument is deprecated.",
832 RemovedInDjango50Warning,
833 stacklevel=2,
834 )
835 test_labels = test_labels or ["."]
836 extra_tests = extra_tests or []
838 discover_kwargs = {}
839 if self.pattern is not None: 839 ↛ 841line 839 didn't jump to line 841, because the condition on line 839 was never false
840 discover_kwargs["pattern"] = self.pattern
841 if self.top_level is not None: 841 ↛ 842line 841 didn't jump to line 842, because the condition on line 841 was never true
842 discover_kwargs["top_level_dir"] = self.top_level
843 self.setup_shuffler()
845 all_tests = []
846 for label in test_labels:
847 tests = self.load_tests_for_label(label, discover_kwargs)
848 all_tests.extend(iter_test_cases(tests))
850 all_tests.extend(iter_test_cases(extra_tests))
852 if self.tags or self.exclude_tags: 852 ↛ 853line 852 didn't jump to line 853, because the condition on line 852 was never true
853 if self.tags:
854 self.log(
855 "Including test tag(s): %s." % ", ".join(sorted(self.tags)),
856 level=logging.DEBUG,
857 )
858 if self.exclude_tags:
859 self.log(
860 "Excluding test tag(s): %s." % ", ".join(sorted(self.exclude_tags)),
861 level=logging.DEBUG,
862 )
863 all_tests = filter_tests_by_tags(all_tests, self.tags, self.exclude_tags)
865 # Put the failures detected at load time first for quicker feedback.
866 # _FailedTest objects include things like test modules that couldn't be
867 # found or that couldn't be loaded due to syntax errors.
868 test_types = (unittest.loader._FailedTest, *self.reorder_by)
869 all_tests = list(
870 reorder_tests(
871 all_tests,
872 test_types,
873 shuffler=self._shuffler,
874 reverse=self.reverse,
875 )
876 )
877 self.log("Found %d test(s)." % len(all_tests))
878 suite = self.test_suite(all_tests)
880 if self.parallel > 1: 880 ↛ 881line 880 didn't jump to line 881, because the condition on line 880 was never true
881 subsuites = partition_suite_by_case(suite)
882 # Since tests are distributed across processes on a per-TestCase
883 # basis, there's no need for more processes than TestCases.
884 processes = min(self.parallel, len(subsuites))
885 # Update also "parallel" because it's used to determine the number
886 # of test databases.
887 self.parallel = processes
888 if processes > 1:
889 suite = self.parallel_test_suite(
890 subsuites,
891 processes,
892 self.failfast,
893 self.buffer,
894 )
895 return suite
897 def setup_databases(self, **kwargs):
898 return _setup_databases(
899 self.verbosity,
900 self.interactive,
901 time_keeper=self.time_keeper,
902 keepdb=self.keepdb,
903 debug_sql=self.debug_sql,
904 parallel=self.parallel,
905 **kwargs,
906 )
908 def get_resultclass(self):
909 if self.debug_sql: 909 ↛ 910line 909 didn't jump to line 910, because the condition on line 909 was never true
910 return DebugSQLTextTestResult
911 elif self.pdb: 911 ↛ 912line 911 didn't jump to line 912, because the condition on line 911 was never true
912 return PDBDebugResult
914 def get_test_runner_kwargs(self):
915 return {
916 "failfast": self.failfast,
917 "resultclass": self.get_resultclass(),
918 "verbosity": self.verbosity,
919 "buffer": self.buffer,
920 }
922 def run_checks(self, databases):
923 # Checks are run after database creation since some checks require
924 # database access.
925 call_command("check", verbosity=self.verbosity, databases=databases)
927 def run_suite(self, suite, **kwargs):
928 kwargs = self.get_test_runner_kwargs()
929 runner = self.test_runner(**kwargs)
930 try:
931 return runner.run(suite)
932 finally:
933 if self._shuffler is not None: 933 ↛ 934line 933 didn't jump to line 934, because the condition on line 933 was never true
934 seed_display = self._shuffler.seed_display
935 self.log(f"Used shuffle seed: {seed_display}")
937 def teardown_databases(self, old_config, **kwargs):
938 """Destroy all the non-mirror databases."""
939 _teardown_databases(
940 old_config,
941 verbosity=self.verbosity,
942 parallel=self.parallel,
943 keepdb=self.keepdb,
944 )
946 def teardown_test_environment(self, **kwargs):
947 unittest.removeHandler()
948 teardown_test_environment()
950 def suite_result(self, suite, result, **kwargs):
951 return len(result.failures) + len(result.errors)
953 def _get_databases(self, suite):
954 databases = {}
955 for test in iter_test_cases(suite):
956 test_databases = getattr(test, "databases", None)
957 if test_databases == "__all__": 957 ↛ 958line 957 didn't jump to line 958, because the condition on line 957 was never true
958 test_databases = connections
959 if test_databases: 959 ↛ 955line 959 didn't jump to line 955, because the condition on line 959 was never false
960 serialized_rollback = getattr(test, "serialized_rollback", False)
961 databases.update(
962 (alias, serialized_rollback or databases.get(alias, False))
963 for alias in test_databases
964 )
965 return databases
967 def get_databases(self, suite):
968 databases = self._get_databases(suite)
969 unused_databases = [alias for alias in connections if alias not in databases]
970 if unused_databases: 970 ↛ 971line 970 didn't jump to line 971, because the condition on line 970 was never true
971 self.log(
972 "Skipping setup of unused database(s): %s."
973 % ", ".join(sorted(unused_databases)),
974 level=logging.DEBUG,
975 )
976 return databases
978 def run_tests(self, test_labels, extra_tests=None, **kwargs):
979 """
980 Run the unit tests for all the test labels in the provided list.
982 Test labels should be dotted Python paths to test modules, test
983 classes, or test methods.
985 Return the number of tests that failed.
986 """
987 if extra_tests is not None: 987 ↛ 988line 987 didn't jump to line 988, because the condition on line 987 was never true
988 warnings.warn(
989 "The extra_tests argument is deprecated.",
990 RemovedInDjango50Warning,
991 stacklevel=2,
992 )
993 self.setup_test_environment()
994 suite = self.build_suite(test_labels, extra_tests)
995 databases = self.get_databases(suite)
996 serialized_aliases = set(
997 alias for alias, serialize in databases.items() if serialize
998 )
999 with self.time_keeper.timed("Total database setup"):
1000 old_config = self.setup_databases(
1001 aliases=databases,
1002 serialized_aliases=serialized_aliases,
1003 )
1004 run_failed = False
1005 try:
1006 self.run_checks(databases)
1007 result = self.run_suite(suite)
1008 except Exception:
1009 run_failed = True
1010 raise
1011 finally:
1012 try:
1013 with self.time_keeper.timed("Total database teardown"):
1014 self.teardown_databases(old_config)
1015 self.teardown_test_environment() 1015 ↛ exitline 1015 didn't except from function 'run_tests', because the raise on line 1010 wasn't executed
1016 except Exception:
1017 # Silence teardown exceptions if an exception was raised during
1018 # runs to avoid shadowing it.
1019 if not run_failed:
1020 raise
1021 self.time_keeper.print_results()
1022 return self.suite_result(suite, result)
1025def try_importing(label):
1026 """
1027 Try importing a test label, and return (is_importable, is_package).
1029 Relative labels like "." and ".." are seen as directories.
1030 """
1031 try:
1032 mod = import_module(label)
1033 except (ImportError, TypeError):
1034 return (False, False)
1036 return (True, hasattr(mod, "__path__"))
1039def find_top_level(top_level):
1040 # Try to be a bit smarter than unittest about finding the default top-level
1041 # for a given directory path, to avoid breaking relative imports.
1042 # (Unittest's default is to set top-level equal to the path, which means
1043 # relative imports will result in "Attempted relative import in
1044 # non-package.").
1046 # We'd be happy to skip this and require dotted module paths (which don't
1047 # cause this problem) instead of file paths (which do), but in the case of
1048 # a directory in the cwd, which would be equally valid if considered as a
1049 # top-level module or as a directory path, unittest unfortunately prefers
1050 # the latter.
1051 while True:
1052 init_py = os.path.join(top_level, "__init__.py")
1053 if not os.path.exists(init_py): 1053 ↛ 1055line 1053 didn't jump to line 1055, because the condition on line 1053 was never false
1054 break
1055 try_next = os.path.dirname(top_level)
1056 if try_next == top_level:
1057 # __init__.py all the way down? give up.
1058 break
1059 top_level = try_next
1060 return top_level
1063def _class_shuffle_key(cls):
1064 return f"{cls.__module__}.{cls.__qualname__}"
1067def shuffle_tests(tests, shuffler):
1068 """
1069 Return an iterator over the given tests in a shuffled order, keeping tests
1070 next to other tests of their class.
1072 `tests` should be an iterable of tests.
1073 """
1074 tests_by_type = {}
1075 for _, class_tests in itertools.groupby(tests, type):
1076 class_tests = list(class_tests)
1077 test_type = type(class_tests[0])
1078 class_tests = shuffler.shuffle(class_tests, key=lambda test: test.id())
1079 tests_by_type[test_type] = class_tests
1081 classes = shuffler.shuffle(tests_by_type, key=_class_shuffle_key)
1083 return itertools.chain(*(tests_by_type[cls] for cls in classes))
1086def reorder_test_bin(tests, shuffler=None, reverse=False):
1087 """
1088 Return an iterator that reorders the given tests, keeping tests next to
1089 other tests of their class.
1091 `tests` should be an iterable of tests that supports reversed().
1092 """
1093 if shuffler is None: 1093 ↛ 1099line 1093 didn't jump to line 1099, because the condition on line 1093 was never false
1094 if reverse: 1094 ↛ 1095line 1094 didn't jump to line 1095, because the condition on line 1094 was never true
1095 return reversed(tests)
1096 # The function must return an iterator.
1097 return iter(tests)
1099 tests = shuffle_tests(tests, shuffler)
1100 if not reverse:
1101 return tests
1102 # Arguments to reversed() must be reversible.
1103 return reversed(list(tests))
1106def reorder_tests(tests, classes, reverse=False, shuffler=None):
1107 """
1108 Reorder an iterable of tests, grouping by the given TestCase classes.
1110 This function also removes any duplicates and reorders so that tests of the
1111 same type are consecutive.
1113 The result is returned as an iterator. `classes` is a sequence of types.
1114 Tests that are instances of `classes[0]` are grouped first, followed by
1115 instances of `classes[1]`, etc. Tests that are not instances of any of the
1116 classes are grouped last.
1118 If `reverse` is True, the tests within each `classes` group are reversed,
1119 but without reversing the order of `classes` itself.
1121 The `shuffler` argument is an optional instance of this module's `Shuffler`
1122 class. If provided, tests will be shuffled within each `classes` group, but
1123 keeping tests with other tests of their TestCase class. Reversing is
1124 applied after shuffling to allow reversing the same random order.
1125 """
1126 # Each bin maps TestCase class to OrderedSet of tests. This permits tests
1127 # to be grouped by TestCase class even if provided non-consecutively.
1128 bins = [defaultdict(OrderedSet) for i in range(len(classes) + 1)]
1129 *class_bins, last_bin = bins
1131 for test in tests:
1132 for test_bin, test_class in zip(class_bins, classes): 1132 ↛ 1136line 1132 didn't jump to line 1136, because the loop on line 1132 didn't complete
1133 if isinstance(test, test_class):
1134 break
1135 else:
1136 test_bin = last_bin
1137 test_bin[type(test)].add(test)
1139 for test_bin in bins:
1140 # Call list() since reorder_test_bin()'s input must support reversed().
1141 tests = list(itertools.chain.from_iterable(test_bin.values()))
1142 yield from reorder_test_bin(tests, shuffler=shuffler, reverse=reverse)
1145def partition_suite_by_case(suite):
1146 """Partition a test suite by test case, preserving the order of tests."""
1147 suite_class = type(suite)
1148 all_tests = iter_test_cases(suite)
1149 return [suite_class(tests) for _, tests in itertools.groupby(all_tests, type)]
1152def test_match_tags(test, tags, exclude_tags):
1153 if isinstance(test, unittest.loader._FailedTest):
1154 # Tests that couldn't load always match to prevent tests from falsely
1155 # passing due e.g. to syntax errors.
1156 return True
1157 test_tags = set(getattr(test, "tags", []))
1158 test_fn_name = getattr(test, "_testMethodName", str(test))
1159 if hasattr(test, test_fn_name):
1160 test_fn = getattr(test, test_fn_name)
1161 test_fn_tags = list(getattr(test_fn, "tags", []))
1162 test_tags = test_tags.union(test_fn_tags)
1163 if tags and test_tags.isdisjoint(tags):
1164 return False
1165 return test_tags.isdisjoint(exclude_tags)
1168def filter_tests_by_tags(tests, tags, exclude_tags):
1169 """Return the matching tests as an iterator."""
1170 return (test for test in tests if test_match_tags(test, tags, exclude_tags))