Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/test/runner.py: 47%

571 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import argparse 

2import ctypes 

3import faulthandler 

4import hashlib 

5import io 

6import itertools 

7import logging 

8import multiprocessing 

9import os 

10import pickle 

11import random 

12import sys 

13import textwrap 

14import unittest 

15import warnings 

16from collections import defaultdict 

17from contextlib import contextmanager 

18from importlib import import_module 

19from io import StringIO 

20 

21from django.core.management import call_command 

22from django.db import connections 

23from django.test import SimpleTestCase, TestCase 

24from django.test.utils import NullTimeKeeper, TimeKeeper, iter_test_cases 

25from django.test.utils import setup_databases as _setup_databases 

26from django.test.utils import setup_test_environment 

27from django.test.utils import teardown_databases as _teardown_databases 

28from django.test.utils import teardown_test_environment 

29from django.utils.datastructures import OrderedSet 

30from django.utils.deprecation import RemovedInDjango50Warning 

31 

32try: 

33 import ipdb as pdb 

34except ImportError: 

35 import pdb 

36 

37try: 

38 import tblib.pickling_support 

39except ImportError: 

40 tblib = None 

41 

42 

43class DebugSQLTextTestResult(unittest.TextTestResult): 

44 def __init__(self, stream, descriptions, verbosity): 

45 self.logger = logging.getLogger("django.db.backends") 

46 self.logger.setLevel(logging.DEBUG) 

47 self.debug_sql_stream = None 

48 super().__init__(stream, descriptions, verbosity) 

49 

50 def startTest(self, test): 

51 self.debug_sql_stream = StringIO() 

52 self.handler = logging.StreamHandler(self.debug_sql_stream) 

53 self.logger.addHandler(self.handler) 

54 super().startTest(test) 

55 

56 def stopTest(self, test): 

57 super().stopTest(test) 

58 self.logger.removeHandler(self.handler) 

59 if self.showAll: 

60 self.debug_sql_stream.seek(0) 

61 self.stream.write(self.debug_sql_stream.read()) 

62 self.stream.writeln(self.separator2) 

63 

64 def addError(self, test, err): 

65 super().addError(test, err) 

66 if self.debug_sql_stream is None: 

67 # Error before tests e.g. in setUpTestData(). 

68 sql = "" 

69 else: 

70 self.debug_sql_stream.seek(0) 

71 sql = self.debug_sql_stream.read() 

72 self.errors[-1] = self.errors[-1] + (sql,) 

73 

74 def addFailure(self, test, err): 

75 super().addFailure(test, err) 

76 self.debug_sql_stream.seek(0) 

77 self.failures[-1] = self.failures[-1] + (self.debug_sql_stream.read(),) 

78 

79 def addSubTest(self, test, subtest, err): 

80 super().addSubTest(test, subtest, err) 

81 if err is not None: 

82 self.debug_sql_stream.seek(0) 

83 errors = ( 

84 self.failures 

85 if issubclass(err[0], test.failureException) 

86 else self.errors 

87 ) 

88 errors[-1] = errors[-1] + (self.debug_sql_stream.read(),) 

89 

90 def printErrorList(self, flavour, errors): 

91 for test, err, sql_debug in errors: 

92 self.stream.writeln(self.separator1) 

93 self.stream.writeln("%s: %s" % (flavour, self.getDescription(test))) 

94 self.stream.writeln(self.separator2) 

95 self.stream.writeln(err) 

96 self.stream.writeln(self.separator2) 

97 self.stream.writeln(sql_debug) 

98 

99 

100class PDBDebugResult(unittest.TextTestResult): 

101 """ 

102 Custom result class that triggers a PDB session when an error or failure 

103 occurs. 

104 """ 

105 

106 def addError(self, test, err): 

107 super().addError(test, err) 

108 self.debug(err) 

109 

110 def addFailure(self, test, err): 

111 super().addFailure(test, err) 

112 self.debug(err) 

113 

114 def addSubTest(self, test, subtest, err): 

115 if err is not None: 

116 self.debug(err) 

117 super().addSubTest(test, subtest, err) 

118 

119 def debug(self, error): 

120 self._restoreStdout() 

121 self.buffer = False 

122 exc_type, exc_value, traceback = error 

123 print("\nOpening PDB: %r" % exc_value) 

124 pdb.post_mortem(traceback) 

125 

126 

127class DummyList: 

128 """ 

129 Dummy list class for faking storage of results in unittest.TestResult. 

130 """ 

131 

132 __slots__ = () 

133 

134 def append(self, item): 

135 pass 

136 

137 

138class RemoteTestResult(unittest.TestResult): 

139 """ 

140 Extend unittest.TestResult to record events in the child processes so they 

141 can be replayed in the parent process. Events include things like which 

142 tests succeeded or failed. 

143 """ 

144 

145 def __init__(self, *args, **kwargs): 

146 super().__init__(*args, **kwargs) 

147 # Fake storage of results to reduce memory usage. These are used by the 

148 # unittest default methods, but here 'events' is used instead. 

149 dummy_list = DummyList() 

150 self.failures = dummy_list 

151 self.errors = dummy_list 

152 self.skipped = dummy_list 

153 self.expectedFailures = dummy_list 

154 self.unexpectedSuccesses = dummy_list 

155 

156 if tblib is not None: 

157 tblib.pickling_support.install() 

158 self.events = [] 

159 

160 def __getstate__(self): 

161 # Make this class picklable by removing the file-like buffer 

162 # attributes. This is possible since they aren't used after unpickling 

163 # after being sent to ParallelTestSuite. 

164 state = self.__dict__.copy() 

165 state.pop("_stdout_buffer", None) 

166 state.pop("_stderr_buffer", None) 

167 state.pop("_original_stdout", None) 

168 state.pop("_original_stderr", None) 

169 return state 

170 

171 @property 

172 def test_index(self): 

173 return self.testsRun - 1 

174 

175 def _confirm_picklable(self, obj): 

176 """ 

177 Confirm that obj can be pickled and unpickled as multiprocessing will 

178 need to pickle the exception in the child process and unpickle it in 

179 the parent process. Let the exception rise, if not. 

180 """ 

181 pickle.loads(pickle.dumps(obj)) 

182 

183 def _print_unpicklable_subtest(self, test, subtest, pickle_exc): 

184 print( 

185 """ 

186Subtest failed: 

187 

188 test: {} 

189 subtest: {} 

190 

191Unfortunately, the subtest that failed cannot be pickled, so the parallel 

192test runner cannot handle it cleanly. Here is the pickling error: 

193 

194> {} 

195 

196You should re-run this test with --parallel=1 to reproduce the failure 

197with a cleaner failure message. 

198""".format( 

199 test, subtest, pickle_exc 

200 ) 

201 ) 

202 

203 def check_picklable(self, test, err): 

204 # Ensure that sys.exc_info() tuples are picklable. This displays a 

205 # clear multiprocessing.pool.RemoteTraceback generated in the child 

206 # process instead of a multiprocessing.pool.MaybeEncodingError, making 

207 # the root cause easier to figure out for users who aren't familiar 

208 # with the multiprocessing module. Since we're in a forked process, 

209 # our best chance to communicate with them is to print to stdout. 

210 try: 

211 self._confirm_picklable(err) 

212 except Exception as exc: 

213 original_exc_txt = repr(err[1]) 

214 original_exc_txt = textwrap.fill( 

215 original_exc_txt, 75, initial_indent=" ", subsequent_indent=" " 

216 ) 

217 pickle_exc_txt = repr(exc) 

218 pickle_exc_txt = textwrap.fill( 

219 pickle_exc_txt, 75, initial_indent=" ", subsequent_indent=" " 

220 ) 

221 if tblib is None: 

222 print( 

223 """ 

224 

225{} failed: 

226 

227{} 

228 

229Unfortunately, tracebacks cannot be pickled, making it impossible for the 

230parallel test runner to handle this exception cleanly. 

231 

232In order to see the traceback, you should install tblib: 

233 

234 python -m pip install tblib 

235""".format( 

236 test, original_exc_txt 

237 ) 

238 ) 

239 else: 

240 print( 

241 """ 

242 

243{} failed: 

244 

245{} 

246 

247Unfortunately, the exception it raised cannot be pickled, making it impossible 

248for the parallel test runner to handle it cleanly. 

249 

250Here's the error encountered while trying to pickle the exception: 

251 

252{} 

253 

254You should re-run this test with the --parallel=1 option to reproduce the 

255failure and get a correct traceback. 

256""".format( 

257 test, original_exc_txt, pickle_exc_txt 

258 ) 

259 ) 

260 raise 

261 

262 def check_subtest_picklable(self, test, subtest): 

263 try: 

264 self._confirm_picklable(subtest) 

265 except Exception as exc: 

266 self._print_unpicklable_subtest(test, subtest, exc) 

267 raise 

268 

269 def startTestRun(self): 

270 super().startTestRun() 

271 self.events.append(("startTestRun",)) 

272 

273 def stopTestRun(self): 

274 super().stopTestRun() 

275 self.events.append(("stopTestRun",)) 

276 

277 def startTest(self, test): 

278 super().startTest(test) 

279 self.events.append(("startTest", self.test_index)) 

280 

281 def stopTest(self, test): 

282 super().stopTest(test) 

283 self.events.append(("stopTest", self.test_index)) 

284 

285 def addError(self, test, err): 

286 self.check_picklable(test, err) 

287 self.events.append(("addError", self.test_index, err)) 

288 super().addError(test, err) 

289 

290 def addFailure(self, test, err): 

291 self.check_picklable(test, err) 

292 self.events.append(("addFailure", self.test_index, err)) 

293 super().addFailure(test, err) 

294 

295 def addSubTest(self, test, subtest, err): 

296 # Follow Python's implementation of unittest.TestResult.addSubTest() by 

297 # not doing anything when a subtest is successful. 

298 if err is not None: 

299 # Call check_picklable() before check_subtest_picklable() since 

300 # check_picklable() performs the tblib check. 

301 self.check_picklable(test, err) 

302 self.check_subtest_picklable(test, subtest) 

303 self.events.append(("addSubTest", self.test_index, subtest, err)) 

304 super().addSubTest(test, subtest, err) 

305 

306 def addSuccess(self, test): 

307 self.events.append(("addSuccess", self.test_index)) 

308 super().addSuccess(test) 

309 

310 def addSkip(self, test, reason): 

311 self.events.append(("addSkip", self.test_index, reason)) 

312 super().addSkip(test, reason) 

313 

314 def addExpectedFailure(self, test, err): 

315 # If tblib isn't installed, pickling the traceback will always fail. 

316 # However we don't want tblib to be required for running the tests 

317 # when they pass or fail as expected. Drop the traceback when an 

318 # expected failure occurs. 

319 if tblib is None: 

320 err = err[0], err[1], None 

321 self.check_picklable(test, err) 

322 self.events.append(("addExpectedFailure", self.test_index, err)) 

323 super().addExpectedFailure(test, err) 

324 

325 def addUnexpectedSuccess(self, test): 

326 self.events.append(("addUnexpectedSuccess", self.test_index)) 

327 super().addUnexpectedSuccess(test) 

328 

329 def wasSuccessful(self): 

330 """Tells whether or not this result was a success.""" 

331 failure_types = {"addError", "addFailure", "addSubTest", "addUnexpectedSuccess"} 

332 return all(e[0] not in failure_types for e in self.events) 

333 

334 def _exc_info_to_string(self, err, test): 

335 # Make this method no-op. It only powers the default unittest behavior 

336 # for recording errors, but this class pickles errors into 'events' 

337 # instead. 

338 return "" 

339 

340 

341class RemoteTestRunner: 

342 """ 

343 Run tests and record everything but don't display anything. 

344 

345 The implementation matches the unpythonic coding style of unittest2. 

346 """ 

347 

348 resultclass = RemoteTestResult 

349 

350 def __init__(self, failfast=False, resultclass=None, buffer=False): 

351 self.failfast = failfast 

352 self.buffer = buffer 

353 if resultclass is not None: 

354 self.resultclass = resultclass 

355 

356 def run(self, test): 

357 result = self.resultclass() 

358 unittest.registerResult(result) 

359 result.failfast = self.failfast 

360 result.buffer = self.buffer 

361 test(result) 

362 return result 

363 

364 

365def get_max_test_processes(): 

366 """ 

367 The maximum number of test processes when using the --parallel option. 

368 """ 

369 # The current implementation of the parallel test runner requires 

370 # multiprocessing to start subprocesses with fork(). 

371 if multiprocessing.get_start_method() != "fork": 

372 return 1 

373 try: 

374 return int(os.environ["DJANGO_TEST_PROCESSES"]) 

375 except KeyError: 

376 return multiprocessing.cpu_count() 

377 

378 

379def parallel_type(value): 

380 """Parse value passed to the --parallel option.""" 

381 if value == "auto": 

382 return value 

383 try: 

384 return int(value) 

385 except ValueError: 

386 raise argparse.ArgumentTypeError( 

387 f"{value!r} is not an integer or the string 'auto'" 

388 ) 

389 

390 

391_worker_id = 0 

392 

393 

394def _init_worker(counter): 

395 """ 

396 Switch to databases dedicated to this worker. 

397 

398 This helper lives at module-level because of the multiprocessing module's 

399 requirements. 

400 """ 

401 

402 global _worker_id 

403 

404 with counter.get_lock(): 

405 counter.value += 1 

406 _worker_id = counter.value 

407 

408 for alias in connections: 

409 connection = connections[alias] 

410 settings_dict = connection.creation.get_test_db_clone_settings(str(_worker_id)) 

411 # connection.settings_dict must be updated in place for changes to be 

412 # reflected in django.db.connections. If the following line assigned 

413 # connection.settings_dict = settings_dict, new threads would connect 

414 # to the default database instead of the appropriate clone. 

415 connection.settings_dict.update(settings_dict) 

416 connection.close() 

417 

418 

419def _run_subsuite(args): 

420 """ 

421 Run a suite of tests with a RemoteTestRunner and return a RemoteTestResult. 

422 

423 This helper lives at module-level and its arguments are wrapped in a tuple 

424 because of the multiprocessing module's requirements. 

425 """ 

426 runner_class, subsuite_index, subsuite, failfast, buffer = args 

427 runner = runner_class(failfast=failfast, buffer=buffer) 

428 result = runner.run(subsuite) 

429 return subsuite_index, result.events 

430 

431 

432class ParallelTestSuite(unittest.TestSuite): 

433 """ 

434 Run a series of tests in parallel in several processes. 

435 

436 While the unittest module's documentation implies that orchestrating the 

437 execution of tests is the responsibility of the test runner, in practice, 

438 it appears that TestRunner classes are more concerned with formatting and 

439 displaying test results. 

440 

441 Since there are fewer use cases for customizing TestSuite than TestRunner, 

442 implementing parallelization at the level of the TestSuite improves 

443 interoperability with existing custom test runners. A single instance of a 

444 test runner can still collect results from all tests without being aware 

445 that they have been run in parallel. 

446 """ 

447 

448 # In case someone wants to modify these in a subclass. 

449 init_worker = _init_worker 

450 run_subsuite = _run_subsuite 

451 runner_class = RemoteTestRunner 

452 

453 def __init__(self, subsuites, processes, failfast=False, buffer=False): 

454 self.subsuites = subsuites 

455 self.processes = processes 

456 self.failfast = failfast 

457 self.buffer = buffer 

458 super().__init__() 

459 

460 def run(self, result): 

461 """ 

462 Distribute test cases across workers. 

463 

464 Return an identifier of each test case with its result in order to use 

465 imap_unordered to show results as soon as they're available. 

466 

467 To minimize pickling errors when getting results from workers: 

468 

469 - pass back numeric indexes in self.subsuites instead of tests 

470 - make tracebacks picklable with tblib, if available 

471 

472 Even with tblib, errors may still occur for dynamically created 

473 exception classes which cannot be unpickled. 

474 """ 

475 counter = multiprocessing.Value(ctypes.c_int, 0) 

476 pool = multiprocessing.Pool( 

477 processes=self.processes, 

478 initializer=self.init_worker.__func__, 

479 initargs=[counter], 

480 ) 

481 args = [ 

482 (self.runner_class, index, subsuite, self.failfast, self.buffer) 

483 for index, subsuite in enumerate(self.subsuites) 

484 ] 

485 test_results = pool.imap_unordered(self.run_subsuite.__func__, args) 

486 

487 while True: 

488 if result.shouldStop: 

489 pool.terminate() 

490 break 

491 

492 try: 

493 subsuite_index, events = test_results.next(timeout=0.1) 

494 except multiprocessing.TimeoutError: 

495 continue 

496 except StopIteration: 

497 pool.close() 

498 break 

499 

500 tests = list(self.subsuites[subsuite_index]) 

501 for event in events: 

502 event_name = event[0] 

503 handler = getattr(result, event_name, None) 

504 if handler is None: 

505 continue 

506 test = tests[event[1]] 

507 args = event[2:] 

508 handler(test, *args) 

509 

510 pool.join() 

511 

512 return result 

513 

514 def __iter__(self): 

515 return iter(self.subsuites) 

516 

517 

518class Shuffler: 

519 """ 

520 This class implements shuffling with a special consistency property. 

521 Consistency means that, for a given seed and key function, if two sets of 

522 items are shuffled, the resulting order will agree on the intersection of 

523 the two sets. For example, if items are removed from an original set, the 

524 shuffled order for the new set will be the shuffled order of the original 

525 set restricted to the smaller set. 

526 """ 

527 

528 # This doesn't need to be cryptographically strong, so use what's fastest. 

529 hash_algorithm = "md5" 

530 

531 @classmethod 

532 def _hash_text(cls, text): 

533 h = hashlib.new(cls.hash_algorithm) 

534 h.update(text.encode("utf-8")) 

535 return h.hexdigest() 

536 

537 def __init__(self, seed=None): 

538 if seed is None: 

539 # Limit seeds to 10 digits for simpler output. 

540 seed = random.randint(0, 10**10 - 1) 

541 seed_source = "generated" 

542 else: 

543 seed_source = "given" 

544 self.seed = seed 

545 self.seed_source = seed_source 

546 

547 @property 

548 def seed_display(self): 

549 return f"{self.seed!r} ({self.seed_source})" 

550 

551 def _hash_item(self, item, key): 

552 text = "{}{}".format(self.seed, key(item)) 

553 return self._hash_text(text) 

554 

555 def shuffle(self, items, key): 

556 """ 

557 Return a new list of the items in a shuffled order. 

558 

559 The `key` is a function that accepts an item in `items` and returns 

560 a string unique for that item that can be viewed as a string id. The 

561 order of the return value is deterministic. It depends on the seed 

562 and key function but not on the original order. 

563 """ 

564 hashes = {} 

565 for item in items: 

566 hashed = self._hash_item(item, key) 

567 if hashed in hashes: 

568 msg = "item {!r} has same hash {!r} as item {!r}".format( 

569 item, 

570 hashed, 

571 hashes[hashed], 

572 ) 

573 raise RuntimeError(msg) 

574 hashes[hashed] = item 

575 return [hashes[hashed] for hashed in sorted(hashes)] 

576 

577 

578class DiscoverRunner: 

579 """A Django test runner that uses unittest2 test discovery.""" 

580 

581 test_suite = unittest.TestSuite 

582 parallel_test_suite = ParallelTestSuite 

583 test_runner = unittest.TextTestRunner 

584 test_loader = unittest.defaultTestLoader 

585 reorder_by = (TestCase, SimpleTestCase) 

586 

587 def __init__( 

588 self, 

589 pattern=None, 

590 top_level=None, 

591 verbosity=1, 

592 interactive=True, 

593 failfast=False, 

594 keepdb=False, 

595 reverse=False, 

596 debug_mode=False, 

597 debug_sql=False, 

598 parallel=0, 

599 tags=None, 

600 exclude_tags=None, 

601 test_name_patterns=None, 

602 pdb=False, 

603 buffer=False, 

604 enable_faulthandler=True, 

605 timing=False, 

606 shuffle=False, 

607 logger=None, 

608 **kwargs, 

609 ): 

610 

611 self.pattern = pattern 

612 self.top_level = top_level 

613 self.verbosity = verbosity 

614 self.interactive = interactive 

615 self.failfast = failfast 

616 self.keepdb = keepdb 

617 self.reverse = reverse 

618 self.debug_mode = debug_mode 

619 self.debug_sql = debug_sql 

620 self.parallel = parallel 

621 self.tags = set(tags or []) 

622 self.exclude_tags = set(exclude_tags or []) 

623 if not faulthandler.is_enabled() and enable_faulthandler: 623 ↛ 628line 623 didn't jump to line 628, because the condition on line 623 was never false

624 try: 

625 faulthandler.enable(file=sys.stderr.fileno()) 

626 except (AttributeError, io.UnsupportedOperation): 

627 faulthandler.enable(file=sys.__stderr__.fileno()) 

628 self.pdb = pdb 

629 if self.pdb and self.parallel > 1: 629 ↛ 630line 629 didn't jump to line 630, because the condition on line 629 was never true

630 raise ValueError( 

631 "You cannot use --pdb with parallel tests; pass --parallel=1 to use it." 

632 ) 

633 self.buffer = buffer 

634 self.test_name_patterns = None 

635 self.time_keeper = TimeKeeper() if timing else NullTimeKeeper() 

636 if test_name_patterns: 636 ↛ 639line 636 didn't jump to line 639, because the condition on line 636 was never true

637 # unittest does not export the _convert_select_pattern function 

638 # that converts command-line arguments to patterns. 

639 self.test_name_patterns = { 

640 pattern if "*" in pattern else "*%s*" % pattern 

641 for pattern in test_name_patterns 

642 } 

643 self.shuffle = shuffle 

644 self._shuffler = None 

645 self.logger = logger 

646 

647 @classmethod 

648 def add_arguments(cls, parser): 

649 parser.add_argument( 

650 "-t", 

651 "--top-level-directory", 

652 dest="top_level", 

653 help="Top level of project for unittest discovery.", 

654 ) 

655 parser.add_argument( 

656 "-p", 

657 "--pattern", 

658 default="test*.py", 

659 help="The test matching pattern. Defaults to test*.py.", 

660 ) 

661 parser.add_argument( 

662 "--keepdb", action="store_true", help="Preserves the test DB between runs." 

663 ) 

664 parser.add_argument( 

665 "--shuffle", 

666 nargs="?", 

667 default=False, 

668 type=int, 

669 metavar="SEED", 

670 help="Shuffles test case order.", 

671 ) 

672 parser.add_argument( 

673 "-r", 

674 "--reverse", 

675 action="store_true", 

676 help="Reverses test case order.", 

677 ) 

678 parser.add_argument( 

679 "--debug-mode", 

680 action="store_true", 

681 help="Sets settings.DEBUG to True.", 

682 ) 

683 parser.add_argument( 

684 "-d", 

685 "--debug-sql", 

686 action="store_true", 

687 help="Prints logged SQL queries on failure.", 

688 ) 

689 parser.add_argument( 

690 "--parallel", 

691 nargs="?", 

692 const="auto", 

693 default=0, 

694 type=parallel_type, 

695 metavar="N", 

696 help=( 

697 "Run tests using up to N parallel processes. Use the value " 

698 '"auto" to run one test process for each processor core.' 

699 ), 

700 ) 

701 parser.add_argument( 

702 "--tag", 

703 action="append", 

704 dest="tags", 

705 help="Run only tests with the specified tag. Can be used multiple times.", 

706 ) 

707 parser.add_argument( 

708 "--exclude-tag", 

709 action="append", 

710 dest="exclude_tags", 

711 help="Do not run tests with the specified tag. Can be used multiple times.", 

712 ) 

713 parser.add_argument( 

714 "--pdb", 

715 action="store_true", 

716 help="Runs a debugger (pdb, or ipdb if installed) on error or failure.", 

717 ) 

718 parser.add_argument( 

719 "-b", 

720 "--buffer", 

721 action="store_true", 

722 help="Discard output from passing tests.", 

723 ) 

724 parser.add_argument( 

725 "--no-faulthandler", 

726 action="store_false", 

727 dest="enable_faulthandler", 

728 help="Disables the Python faulthandler module during tests.", 

729 ) 

730 parser.add_argument( 

731 "--timing", 

732 action="store_true", 

733 help=("Output timings, including database set up and total run time."), 

734 ) 

735 parser.add_argument( 

736 "-k", 

737 action="append", 

738 dest="test_name_patterns", 

739 help=( 

740 "Only run test methods and classes that match the pattern " 

741 "or substring. Can be used multiple times. Same as " 

742 "unittest -k option." 

743 ), 

744 ) 

745 

746 @property 

747 def shuffle_seed(self): 

748 if self._shuffler is None: 

749 return None 

750 return self._shuffler.seed 

751 

752 def log(self, msg, level=None): 

753 """ 

754 Log the message at the given logging level (the default is INFO). 

755 

756 If a logger isn't set, the message is instead printed to the console, 

757 respecting the configured verbosity. A verbosity of 0 prints no output, 

758 a verbosity of 1 prints INFO and above, and a verbosity of 2 or higher 

759 prints all levels. 

760 """ 

761 if level is None: 761 ↛ 763line 761 didn't jump to line 763, because the condition on line 761 was never false

762 level = logging.INFO 

763 if self.logger is None: 763 ↛ 768line 763 didn't jump to line 768, because the condition on line 763 was never false

764 if self.verbosity <= 0 or (self.verbosity == 1 and level < logging.INFO): 764 ↛ 765line 764 didn't jump to line 765, because the condition on line 764 was never true

765 return 

766 print(msg) 

767 else: 

768 self.logger.log(level, msg) 

769 

770 def setup_test_environment(self, **kwargs): 

771 setup_test_environment(debug=self.debug_mode) 

772 unittest.installHandler() 

773 

774 def setup_shuffler(self): 

775 if self.shuffle is False: 775 ↛ 777line 775 didn't jump to line 777, because the condition on line 775 was never false

776 return 

777 shuffler = Shuffler(seed=self.shuffle) 

778 self.log(f"Using shuffle seed: {shuffler.seed_display}") 

779 self._shuffler = shuffler 

780 

781 @contextmanager 

782 def load_with_patterns(self): 

783 original_test_name_patterns = self.test_loader.testNamePatterns 

784 self.test_loader.testNamePatterns = self.test_name_patterns 

785 try: 

786 yield 

787 finally: 

788 # Restore the original patterns. 

789 self.test_loader.testNamePatterns = original_test_name_patterns 

790 

791 def load_tests_for_label(self, label, discover_kwargs): 

792 label_as_path = os.path.abspath(label) 

793 tests = None 

794 

795 # If a module, or "module.ClassName[.method_name]", just run those. 

796 if not os.path.exists(label_as_path): 796 ↛ 797line 796 didn't jump to line 797, because the condition on line 796 was never true

797 with self.load_with_patterns(): 

798 tests = self.test_loader.loadTestsFromName(label) 

799 if tests.countTestCases(): 

800 return tests 

801 # Try discovery if "label" is a package or directory. 

802 is_importable, is_package = try_importing(label) 

803 if is_importable: 803 ↛ 804line 803 didn't jump to line 804, because the condition on line 803 was never true

804 if not is_package: 

805 return tests 

806 elif not os.path.isdir(label_as_path): 806 ↛ 807line 806 didn't jump to line 807, because the condition on line 806 was never true

807 if os.path.exists(label_as_path): 

808 assert tests is None 

809 raise RuntimeError( 

810 f"One of the test labels is a path to a file: {label!r}, " 

811 f"which is not supported. Use a dotted module name or " 

812 f"path to a directory instead." 

813 ) 

814 return tests 

815 

816 kwargs = discover_kwargs.copy() 

817 if os.path.isdir(label_as_path) and not self.top_level: 817 ↛ 820line 817 didn't jump to line 820, because the condition on line 817 was never false

818 kwargs["top_level_dir"] = find_top_level(label_as_path) 

819 

820 with self.load_with_patterns(): 

821 tests = self.test_loader.discover(start_dir=label, **kwargs) 

822 

823 # Make unittest forget the top-level dir it calculated from this run, 

824 # to support running tests from two different top-levels. 

825 self.test_loader._top_level_dir = None 

826 return tests 

827 

828 def build_suite(self, test_labels=None, extra_tests=None, **kwargs): 

829 if extra_tests is not None: 829 ↛ 830line 829 didn't jump to line 830, because the condition on line 829 was never true

830 warnings.warn( 

831 "The extra_tests argument is deprecated.", 

832 RemovedInDjango50Warning, 

833 stacklevel=2, 

834 ) 

835 test_labels = test_labels or ["."] 

836 extra_tests = extra_tests or [] 

837 

838 discover_kwargs = {} 

839 if self.pattern is not None: 839 ↛ 841line 839 didn't jump to line 841, because the condition on line 839 was never false

840 discover_kwargs["pattern"] = self.pattern 

841 if self.top_level is not None: 841 ↛ 842line 841 didn't jump to line 842, because the condition on line 841 was never true

842 discover_kwargs["top_level_dir"] = self.top_level 

843 self.setup_shuffler() 

844 

845 all_tests = [] 

846 for label in test_labels: 

847 tests = self.load_tests_for_label(label, discover_kwargs) 

848 all_tests.extend(iter_test_cases(tests)) 

849 

850 all_tests.extend(iter_test_cases(extra_tests)) 

851 

852 if self.tags or self.exclude_tags: 852 ↛ 853line 852 didn't jump to line 853, because the condition on line 852 was never true

853 if self.tags: 

854 self.log( 

855 "Including test tag(s): %s." % ", ".join(sorted(self.tags)), 

856 level=logging.DEBUG, 

857 ) 

858 if self.exclude_tags: 

859 self.log( 

860 "Excluding test tag(s): %s." % ", ".join(sorted(self.exclude_tags)), 

861 level=logging.DEBUG, 

862 ) 

863 all_tests = filter_tests_by_tags(all_tests, self.tags, self.exclude_tags) 

864 

865 # Put the failures detected at load time first for quicker feedback. 

866 # _FailedTest objects include things like test modules that couldn't be 

867 # found or that couldn't be loaded due to syntax errors. 

868 test_types = (unittest.loader._FailedTest, *self.reorder_by) 

869 all_tests = list( 

870 reorder_tests( 

871 all_tests, 

872 test_types, 

873 shuffler=self._shuffler, 

874 reverse=self.reverse, 

875 ) 

876 ) 

877 self.log("Found %d test(s)." % len(all_tests)) 

878 suite = self.test_suite(all_tests) 

879 

880 if self.parallel > 1: 880 ↛ 881line 880 didn't jump to line 881, because the condition on line 880 was never true

881 subsuites = partition_suite_by_case(suite) 

882 # Since tests are distributed across processes on a per-TestCase 

883 # basis, there's no need for more processes than TestCases. 

884 processes = min(self.parallel, len(subsuites)) 

885 # Update also "parallel" because it's used to determine the number 

886 # of test databases. 

887 self.parallel = processes 

888 if processes > 1: 

889 suite = self.parallel_test_suite( 

890 subsuites, 

891 processes, 

892 self.failfast, 

893 self.buffer, 

894 ) 

895 return suite 

896 

897 def setup_databases(self, **kwargs): 

898 return _setup_databases( 

899 self.verbosity, 

900 self.interactive, 

901 time_keeper=self.time_keeper, 

902 keepdb=self.keepdb, 

903 debug_sql=self.debug_sql, 

904 parallel=self.parallel, 

905 **kwargs, 

906 ) 

907 

908 def get_resultclass(self): 

909 if self.debug_sql: 909 ↛ 910line 909 didn't jump to line 910, because the condition on line 909 was never true

910 return DebugSQLTextTestResult 

911 elif self.pdb: 911 ↛ 912line 911 didn't jump to line 912, because the condition on line 911 was never true

912 return PDBDebugResult 

913 

914 def get_test_runner_kwargs(self): 

915 return { 

916 "failfast": self.failfast, 

917 "resultclass": self.get_resultclass(), 

918 "verbosity": self.verbosity, 

919 "buffer": self.buffer, 

920 } 

921 

922 def run_checks(self, databases): 

923 # Checks are run after database creation since some checks require 

924 # database access. 

925 call_command("check", verbosity=self.verbosity, databases=databases) 

926 

927 def run_suite(self, suite, **kwargs): 

928 kwargs = self.get_test_runner_kwargs() 

929 runner = self.test_runner(**kwargs) 

930 try: 

931 return runner.run(suite) 

932 finally: 

933 if self._shuffler is not None: 933 ↛ 934line 933 didn't jump to line 934, because the condition on line 933 was never true

934 seed_display = self._shuffler.seed_display 

935 self.log(f"Used shuffle seed: {seed_display}") 

936 

937 def teardown_databases(self, old_config, **kwargs): 

938 """Destroy all the non-mirror databases.""" 

939 _teardown_databases( 

940 old_config, 

941 verbosity=self.verbosity, 

942 parallel=self.parallel, 

943 keepdb=self.keepdb, 

944 ) 

945 

946 def teardown_test_environment(self, **kwargs): 

947 unittest.removeHandler() 

948 teardown_test_environment() 

949 

950 def suite_result(self, suite, result, **kwargs): 

951 return len(result.failures) + len(result.errors) 

952 

953 def _get_databases(self, suite): 

954 databases = {} 

955 for test in iter_test_cases(suite): 

956 test_databases = getattr(test, "databases", None) 

957 if test_databases == "__all__": 957 ↛ 958line 957 didn't jump to line 958, because the condition on line 957 was never true

958 test_databases = connections 

959 if test_databases: 959 ↛ 955line 959 didn't jump to line 955, because the condition on line 959 was never false

960 serialized_rollback = getattr(test, "serialized_rollback", False) 

961 databases.update( 

962 (alias, serialized_rollback or databases.get(alias, False)) 

963 for alias in test_databases 

964 ) 

965 return databases 

966 

967 def get_databases(self, suite): 

968 databases = self._get_databases(suite) 

969 unused_databases = [alias for alias in connections if alias not in databases] 

970 if unused_databases: 970 ↛ 971line 970 didn't jump to line 971, because the condition on line 970 was never true

971 self.log( 

972 "Skipping setup of unused database(s): %s." 

973 % ", ".join(sorted(unused_databases)), 

974 level=logging.DEBUG, 

975 ) 

976 return databases 

977 

978 def run_tests(self, test_labels, extra_tests=None, **kwargs): 

979 """ 

980 Run the unit tests for all the test labels in the provided list. 

981 

982 Test labels should be dotted Python paths to test modules, test 

983 classes, or test methods. 

984 

985 Return the number of tests that failed. 

986 """ 

987 if extra_tests is not None: 987 ↛ 988line 987 didn't jump to line 988, because the condition on line 987 was never true

988 warnings.warn( 

989 "The extra_tests argument is deprecated.", 

990 RemovedInDjango50Warning, 

991 stacklevel=2, 

992 ) 

993 self.setup_test_environment() 

994 suite = self.build_suite(test_labels, extra_tests) 

995 databases = self.get_databases(suite) 

996 serialized_aliases = set( 

997 alias for alias, serialize in databases.items() if serialize 

998 ) 

999 with self.time_keeper.timed("Total database setup"): 

1000 old_config = self.setup_databases( 

1001 aliases=databases, 

1002 serialized_aliases=serialized_aliases, 

1003 ) 

1004 run_failed = False 

1005 try: 

1006 self.run_checks(databases) 

1007 result = self.run_suite(suite) 

1008 except Exception: 

1009 run_failed = True 

1010 raise 

1011 finally: 

1012 try: 

1013 with self.time_keeper.timed("Total database teardown"): 

1014 self.teardown_databases(old_config) 

1015 self.teardown_test_environment() 1015 ↛ exitline 1015 didn't except from function 'run_tests', because the raise on line 1010 wasn't executed

1016 except Exception: 

1017 # Silence teardown exceptions if an exception was raised during 

1018 # runs to avoid shadowing it. 

1019 if not run_failed: 

1020 raise 

1021 self.time_keeper.print_results() 

1022 return self.suite_result(suite, result) 

1023 

1024 

1025def try_importing(label): 

1026 """ 

1027 Try importing a test label, and return (is_importable, is_package). 

1028 

1029 Relative labels like "." and ".." are seen as directories. 

1030 """ 

1031 try: 

1032 mod = import_module(label) 

1033 except (ImportError, TypeError): 

1034 return (False, False) 

1035 

1036 return (True, hasattr(mod, "__path__")) 

1037 

1038 

1039def find_top_level(top_level): 

1040 # Try to be a bit smarter than unittest about finding the default top-level 

1041 # for a given directory path, to avoid breaking relative imports. 

1042 # (Unittest's default is to set top-level equal to the path, which means 

1043 # relative imports will result in "Attempted relative import in 

1044 # non-package."). 

1045 

1046 # We'd be happy to skip this and require dotted module paths (which don't 

1047 # cause this problem) instead of file paths (which do), but in the case of 

1048 # a directory in the cwd, which would be equally valid if considered as a 

1049 # top-level module or as a directory path, unittest unfortunately prefers 

1050 # the latter. 

1051 while True: 

1052 init_py = os.path.join(top_level, "__init__.py") 

1053 if not os.path.exists(init_py): 1053 ↛ 1055line 1053 didn't jump to line 1055, because the condition on line 1053 was never false

1054 break 

1055 try_next = os.path.dirname(top_level) 

1056 if try_next == top_level: 

1057 # __init__.py all the way down? give up. 

1058 break 

1059 top_level = try_next 

1060 return top_level 

1061 

1062 

1063def _class_shuffle_key(cls): 

1064 return f"{cls.__module__}.{cls.__qualname__}" 

1065 

1066 

1067def shuffle_tests(tests, shuffler): 

1068 """ 

1069 Return an iterator over the given tests in a shuffled order, keeping tests 

1070 next to other tests of their class. 

1071 

1072 `tests` should be an iterable of tests. 

1073 """ 

1074 tests_by_type = {} 

1075 for _, class_tests in itertools.groupby(tests, type): 

1076 class_tests = list(class_tests) 

1077 test_type = type(class_tests[0]) 

1078 class_tests = shuffler.shuffle(class_tests, key=lambda test: test.id()) 

1079 tests_by_type[test_type] = class_tests 

1080 

1081 classes = shuffler.shuffle(tests_by_type, key=_class_shuffle_key) 

1082 

1083 return itertools.chain(*(tests_by_type[cls] for cls in classes)) 

1084 

1085 

1086def reorder_test_bin(tests, shuffler=None, reverse=False): 

1087 """ 

1088 Return an iterator that reorders the given tests, keeping tests next to 

1089 other tests of their class. 

1090 

1091 `tests` should be an iterable of tests that supports reversed(). 

1092 """ 

1093 if shuffler is None: 1093 ↛ 1099line 1093 didn't jump to line 1099, because the condition on line 1093 was never false

1094 if reverse: 1094 ↛ 1095line 1094 didn't jump to line 1095, because the condition on line 1094 was never true

1095 return reversed(tests) 

1096 # The function must return an iterator. 

1097 return iter(tests) 

1098 

1099 tests = shuffle_tests(tests, shuffler) 

1100 if not reverse: 

1101 return tests 

1102 # Arguments to reversed() must be reversible. 

1103 return reversed(list(tests)) 

1104 

1105 

1106def reorder_tests(tests, classes, reverse=False, shuffler=None): 

1107 """ 

1108 Reorder an iterable of tests, grouping by the given TestCase classes. 

1109 

1110 This function also removes any duplicates and reorders so that tests of the 

1111 same type are consecutive. 

1112 

1113 The result is returned as an iterator. `classes` is a sequence of types. 

1114 Tests that are instances of `classes[0]` are grouped first, followed by 

1115 instances of `classes[1]`, etc. Tests that are not instances of any of the 

1116 classes are grouped last. 

1117 

1118 If `reverse` is True, the tests within each `classes` group are reversed, 

1119 but without reversing the order of `classes` itself. 

1120 

1121 The `shuffler` argument is an optional instance of this module's `Shuffler` 

1122 class. If provided, tests will be shuffled within each `classes` group, but 

1123 keeping tests with other tests of their TestCase class. Reversing is 

1124 applied after shuffling to allow reversing the same random order. 

1125 """ 

1126 # Each bin maps TestCase class to OrderedSet of tests. This permits tests 

1127 # to be grouped by TestCase class even if provided non-consecutively. 

1128 bins = [defaultdict(OrderedSet) for i in range(len(classes) + 1)] 

1129 *class_bins, last_bin = bins 

1130 

1131 for test in tests: 

1132 for test_bin, test_class in zip(class_bins, classes): 1132 ↛ 1136line 1132 didn't jump to line 1136, because the loop on line 1132 didn't complete

1133 if isinstance(test, test_class): 

1134 break 

1135 else: 

1136 test_bin = last_bin 

1137 test_bin[type(test)].add(test) 

1138 

1139 for test_bin in bins: 

1140 # Call list() since reorder_test_bin()'s input must support reversed(). 

1141 tests = list(itertools.chain.from_iterable(test_bin.values())) 

1142 yield from reorder_test_bin(tests, shuffler=shuffler, reverse=reverse) 

1143 

1144 

1145def partition_suite_by_case(suite): 

1146 """Partition a test suite by test case, preserving the order of tests.""" 

1147 suite_class = type(suite) 

1148 all_tests = iter_test_cases(suite) 

1149 return [suite_class(tests) for _, tests in itertools.groupby(all_tests, type)] 

1150 

1151 

1152def test_match_tags(test, tags, exclude_tags): 

1153 if isinstance(test, unittest.loader._FailedTest): 

1154 # Tests that couldn't load always match to prevent tests from falsely 

1155 # passing due e.g. to syntax errors. 

1156 return True 

1157 test_tags = set(getattr(test, "tags", [])) 

1158 test_fn_name = getattr(test, "_testMethodName", str(test)) 

1159 if hasattr(test, test_fn_name): 

1160 test_fn = getattr(test, test_fn_name) 

1161 test_fn_tags = list(getattr(test_fn, "tags", [])) 

1162 test_tags = test_tags.union(test_fn_tags) 

1163 if tags and test_tags.isdisjoint(tags): 

1164 return False 

1165 return test_tags.isdisjoint(exclude_tags) 

1166 

1167 

1168def filter_tests_by_tags(tests, tags, exclude_tags): 

1169 """Return the matching tests as an iterator.""" 

1170 return (test for test in tests if test_match_tags(test, tags, exclude_tags))