Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/test/testcases.py: 33%

834 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import asyncio 

2import difflib 

3import json 

4import logging 

5import posixpath 

6import sys 

7import threading 

8import unittest 

9import warnings 

10from collections import Counter 

11from contextlib import contextmanager 

12from copy import copy, deepcopy 

13from difflib import get_close_matches 

14from functools import wraps 

15from unittest.suite import _DebugResult 

16from unittest.util import safe_repr 

17from urllib.parse import ( 

18 parse_qsl, 

19 unquote, 

20 urlencode, 

21 urljoin, 

22 urlparse, 

23 urlsplit, 

24 urlunparse, 

25) 

26from urllib.request import url2pathname 

27 

28from asgiref.sync import async_to_sync 

29 

30from django.apps import apps 

31from django.conf import settings 

32from django.core import mail 

33from django.core.exceptions import ImproperlyConfigured, ValidationError 

34from django.core.files import locks 

35from django.core.handlers.wsgi import WSGIHandler, get_path_info 

36from django.core.management import call_command 

37from django.core.management.color import no_style 

38from django.core.management.sql import emit_post_migrate_signal 

39from django.core.servers.basehttp import ThreadedWSGIServer, WSGIRequestHandler 

40from django.db import DEFAULT_DB_ALIAS, connection, connections, transaction 

41from django.forms.fields import CharField 

42from django.http import QueryDict 

43from django.http.request import split_domain_port, validate_host 

44from django.test.client import AsyncClient, Client 

45from django.test.html import HTMLParseError, parse_html 

46from django.test.signals import setting_changed, template_rendered 

47from django.test.utils import ( 

48 CaptureQueriesContext, 

49 ContextList, 

50 compare_xml, 

51 modify_settings, 

52 override_settings, 

53) 

54from django.utils.deprecation import RemovedInDjango41Warning 

55from django.utils.functional import classproperty 

56from django.utils.version import PY310 

57from django.views.static import serve 

58 

59__all__ = ( 

60 "TestCase", 

61 "TransactionTestCase", 

62 "SimpleTestCase", 

63 "skipIfDBFeature", 

64 "skipUnlessDBFeature", 

65) 

66 

67 

68def to_list(value): 

69 """ 

70 Put value into a list if it's not already one. Return an empty list if 

71 value is None. 

72 """ 

73 if value is None: 

74 value = [] 

75 elif not isinstance(value, list): 

76 value = [value] 

77 return value 

78 

79 

80def assert_and_parse_html(self, html, user_msg, msg): 

81 try: 

82 dom = parse_html(html) 

83 except HTMLParseError as e: 

84 standardMsg = "%s\n%s" % (msg, e) 

85 self.fail(self._formatMessage(user_msg, standardMsg)) 

86 return dom 

87 

88 

89class _AssertNumQueriesContext(CaptureQueriesContext): 

90 def __init__(self, test_case, num, connection): 

91 self.test_case = test_case 

92 self.num = num 

93 super().__init__(connection) 

94 

95 def __exit__(self, exc_type, exc_value, traceback): 

96 super().__exit__(exc_type, exc_value, traceback) 

97 if exc_type is not None: 

98 return 

99 executed = len(self) 

100 self.test_case.assertEqual( 

101 executed, 

102 self.num, 

103 "%d queries executed, %d expected\nCaptured queries were:\n%s" 

104 % ( 

105 executed, 

106 self.num, 

107 "\n".join( 

108 "%d. %s" % (i, query["sql"]) 

109 for i, query in enumerate(self.captured_queries, start=1) 

110 ), 

111 ), 

112 ) 

113 

114 

115class _AssertTemplateUsedContext: 

116 def __init__(self, test_case, template_name): 

117 self.test_case = test_case 

118 self.template_name = template_name 

119 self.rendered_templates = [] 

120 self.rendered_template_names = [] 

121 self.context = ContextList() 

122 

123 def on_template_render(self, sender, signal, template, context, **kwargs): 

124 self.rendered_templates.append(template) 

125 self.rendered_template_names.append(template.name) 

126 self.context.append(copy(context)) 

127 

128 def test(self): 

129 return self.template_name in self.rendered_template_names 

130 

131 def message(self): 

132 return "%s was not rendered." % self.template_name 

133 

134 def __enter__(self): 

135 template_rendered.connect(self.on_template_render) 

136 return self 

137 

138 def __exit__(self, exc_type, exc_value, traceback): 

139 template_rendered.disconnect(self.on_template_render) 

140 if exc_type is not None: 

141 return 

142 

143 if not self.test(): 

144 message = self.message() 

145 if self.rendered_templates: 

146 message += " Following templates were rendered: %s" % ( 

147 ", ".join(self.rendered_template_names) 

148 ) 

149 else: 

150 message += " No template was rendered." 

151 self.test_case.fail(message) 

152 

153 

154class _AssertTemplateNotUsedContext(_AssertTemplateUsedContext): 

155 def test(self): 

156 return self.template_name not in self.rendered_template_names 

157 

158 def message(self): 

159 return "%s was rendered." % self.template_name 

160 

161 

162class _DatabaseFailure: 

163 def __init__(self, wrapped, message): 

164 self.wrapped = wrapped 

165 self.message = message 

166 

167 def __call__(self): 

168 raise AssertionError(self.message) 

169 

170 

171class SimpleTestCase(unittest.TestCase): 

172 

173 # The class we'll use for the test client self.client. 

174 # Can be overridden in derived classes. 

175 client_class = Client 

176 async_client_class = AsyncClient 

177 _overridden_settings = None 

178 _modified_settings = None 

179 

180 databases = set() 

181 _disallowed_database_msg = ( 

182 "Database %(operation)s to %(alias)r are not allowed in SimpleTestCase " 

183 "subclasses. Either subclass TestCase or TransactionTestCase to ensure " 

184 "proper test isolation or add %(alias)r to %(test)s.databases to silence " 

185 "this failure." 

186 ) 

187 _disallowed_connection_methods = [ 

188 ("connect", "connections"), 

189 ("temporary_connection", "connections"), 

190 ("cursor", "queries"), 

191 ("chunked_cursor", "queries"), 

192 ] 

193 

194 @classmethod 

195 def setUpClass(cls): 

196 super().setUpClass() 

197 if cls._overridden_settings: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true

198 cls._cls_overridden_context = override_settings(**cls._overridden_settings) 

199 cls._cls_overridden_context.enable() 

200 cls.addClassCleanup(cls._cls_overridden_context.disable) 

201 if cls._modified_settings: 201 ↛ 202line 201 didn't jump to line 202, because the condition on line 201 was never true

202 cls._cls_modified_context = modify_settings(cls._modified_settings) 

203 cls._cls_modified_context.enable() 

204 cls.addClassCleanup(cls._cls_modified_context.disable) 

205 cls._add_databases_failures() 

206 cls.addClassCleanup(cls._remove_databases_failures) 

207 

208 @classmethod 

209 def _validate_databases(cls): 

210 if cls.databases == "__all__": 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 return frozenset(connections) 

212 for alias in cls.databases: 

213 if alias not in connections: 213 ↛ 214line 213 didn't jump to line 214

214 message = ( 

215 "%s.%s.databases refers to %r which is not defined in " 

216 "settings.DATABASES." 

217 % ( 

218 cls.__module__, 

219 cls.__qualname__, 

220 alias, 

221 ) 

222 ) 

223 close_matches = get_close_matches(alias, list(connections)) 

224 if close_matches: 

225 message += " Did you mean %r?" % close_matches[0] 

226 raise ImproperlyConfigured(message) 

227 return frozenset(cls.databases) 

228 

229 @classmethod 

230 def _add_databases_failures(cls): 

231 cls.databases = cls._validate_databases() 

232 for alias in connections: 

233 if alias in cls.databases: 233 ↛ 235line 233 didn't jump to line 235, because the condition on line 233 was never false

234 continue 

235 connection = connections[alias] 

236 for name, operation in cls._disallowed_connection_methods: 

237 message = cls._disallowed_database_msg % { 

238 "test": "%s.%s" % (cls.__module__, cls.__qualname__), 

239 "alias": alias, 

240 "operation": operation, 

241 } 

242 method = getattr(connection, name) 

243 setattr(connection, name, _DatabaseFailure(method, message)) 

244 

245 @classmethod 

246 def _remove_databases_failures(cls): 

247 for alias in connections: 

248 if alias in cls.databases: 248 ↛ 250line 248 didn't jump to line 250, because the condition on line 248 was never false

249 continue 

250 connection = connections[alias] 

251 for name, _ in cls._disallowed_connection_methods: 

252 method = getattr(connection, name) 

253 setattr(connection, name, method.wrapped) 

254 

255 def __call__(self, result=None): 

256 """ 

257 Wrapper around default __call__ method to perform common Django test 

258 set up. This means that user-defined Test Cases aren't required to 

259 include a call to super().setUp(). 

260 """ 

261 self._setup_and_call(result) 

262 

263 def debug(self): 

264 """Perform the same as __call__(), without catching the exception.""" 

265 debug_result = _DebugResult() 

266 self._setup_and_call(debug_result, debug=True) 

267 

268 def _setup_and_call(self, result, debug=False): 

269 """ 

270 Perform the following in order: pre-setup, run test, post-teardown, 

271 skipping pre/post hooks if test is set to be skipped. 

272 

273 If debug=True, reraise any errors in setup and use super().debug() 

274 instead of __call__() to run the test. 

275 """ 

276 testMethod = getattr(self, self._testMethodName) 

277 skipped = getattr(self.__class__, "__unittest_skip__", False) or getattr( 

278 testMethod, "__unittest_skip__", False 

279 ) 

280 

281 # Convert async test methods. 

282 if asyncio.iscoroutinefunction(testMethod): 282 ↛ 283line 282 didn't jump to line 283, because the condition on line 282 was never true

283 setattr(self, self._testMethodName, async_to_sync(testMethod)) 

284 

285 if not skipped: 285 ↛ 293line 285 didn't jump to line 293, because the condition on line 285 was never false

286 try: 

287 self._pre_setup() 

288 except Exception: 

289 if debug: 

290 raise 

291 result.addError(self, sys.exc_info()) 

292 return 

293 if debug: 293 ↛ 294line 293 didn't jump to line 294, because the condition on line 293 was never true

294 super().debug() 

295 else: 

296 super().__call__(result) 

297 if not skipped: 297 ↛ exitline 297 didn't return from function '_setup_and_call', because the condition on line 297 was never false

298 try: 

299 self._post_teardown() 

300 except Exception: 

301 if debug: 

302 raise 

303 result.addError(self, sys.exc_info()) 

304 return 

305 

306 def _pre_setup(self): 

307 """ 

308 Perform pre-test setup: 

309 * Create a test client. 

310 * Clear the mail test outbox. 

311 """ 

312 self.client = self.client_class() 

313 self.async_client = self.async_client_class() 

314 mail.outbox = [] 

315 

316 def _post_teardown(self): 

317 """Perform post-test things.""" 

318 pass 

319 

320 def settings(self, **kwargs): 

321 """ 

322 A context manager that temporarily sets a setting and reverts to the 

323 original value when exiting the context. 

324 """ 

325 return override_settings(**kwargs) 

326 

327 def modify_settings(self, **kwargs): 

328 """ 

329 A context manager that temporarily applies changes a list setting and 

330 reverts back to the original value when exiting the context. 

331 """ 

332 return modify_settings(**kwargs) 

333 

334 def assertRedirects( 

335 self, 

336 response, 

337 expected_url, 

338 status_code=302, 

339 target_status_code=200, 

340 msg_prefix="", 

341 fetch_redirect_response=True, 

342 ): 

343 """ 

344 Assert that a response redirected to a specific URL and that the 

345 redirect URL can be loaded. 

346 

347 Won't work for external links since it uses the test client to do a 

348 request (use fetch_redirect_response=False to check such links without 

349 fetching them). 

350 """ 

351 if msg_prefix: 

352 msg_prefix += ": " 

353 

354 if hasattr(response, "redirect_chain"): 

355 # The request was a followed redirect 

356 self.assertTrue( 

357 response.redirect_chain, 

358 msg_prefix 

359 + ( 

360 "Response didn't redirect as expected: Response code was %d " 

361 "(expected %d)" 

362 ) 

363 % (response.status_code, status_code), 

364 ) 

365 

366 self.assertEqual( 

367 response.redirect_chain[0][1], 

368 status_code, 

369 msg_prefix 

370 + ( 

371 "Initial response didn't redirect as expected: Response code was " 

372 "%d (expected %d)" 

373 ) 

374 % (response.redirect_chain[0][1], status_code), 

375 ) 

376 

377 url, status_code = response.redirect_chain[-1] 

378 

379 self.assertEqual( 

380 response.status_code, 

381 target_status_code, 

382 msg_prefix 

383 + ( 

384 "Response didn't redirect as expected: Final Response code was %d " 

385 "(expected %d)" 

386 ) 

387 % (response.status_code, target_status_code), 

388 ) 

389 

390 else: 

391 # Not a followed redirect 

392 self.assertEqual( 

393 response.status_code, 

394 status_code, 

395 msg_prefix 

396 + ( 

397 "Response didn't redirect as expected: Response code was %d " 

398 "(expected %d)" 

399 ) 

400 % (response.status_code, status_code), 

401 ) 

402 

403 url = response.url 

404 scheme, netloc, path, query, fragment = urlsplit(url) 

405 

406 # Prepend the request path to handle relative path redirects. 

407 if not path.startswith("/"): 

408 url = urljoin(response.request["PATH_INFO"], url) 

409 path = urljoin(response.request["PATH_INFO"], path) 

410 

411 if fetch_redirect_response: 

412 # netloc might be empty, or in cases where Django tests the 

413 # HTTP scheme, the convention is for netloc to be 'testserver'. 

414 # Trust both as "internal" URLs here. 

415 domain, port = split_domain_port(netloc) 

416 if domain and not validate_host(domain, settings.ALLOWED_HOSTS): 

417 raise ValueError( 

418 "The test client is unable to fetch remote URLs (got %s). " 

419 "If the host is served by Django, add '%s' to ALLOWED_HOSTS. " 

420 "Otherwise, use " 

421 "assertRedirects(..., fetch_redirect_response=False)." 

422 % (url, domain) 

423 ) 

424 # Get the redirection page, using the same client that was used 

425 # to obtain the original response. 

426 extra = response.client.extra or {} 

427 redirect_response = response.client.get( 

428 path, 

429 QueryDict(query), 

430 secure=(scheme == "https"), 

431 **extra, 

432 ) 

433 self.assertEqual( 

434 redirect_response.status_code, 

435 target_status_code, 

436 msg_prefix 

437 + ( 

438 "Couldn't retrieve redirection page '%s': response code was %d " 

439 "(expected %d)" 

440 ) 

441 % (path, redirect_response.status_code, target_status_code), 

442 ) 

443 

444 self.assertURLEqual( 

445 url, 

446 expected_url, 

447 msg_prefix 

448 + "Response redirected to '%s', expected '%s'" % (url, expected_url), 

449 ) 

450 

451 def assertURLEqual(self, url1, url2, msg_prefix=""): 

452 """ 

453 Assert that two URLs are the same, ignoring the order of query string 

454 parameters except for parameters with the same name. 

455 

456 For example, /path/?x=1&y=2 is equal to /path/?y=2&x=1, but 

457 /path/?a=1&a=2 isn't equal to /path/?a=2&a=1. 

458 """ 

459 

460 def normalize(url): 

461 """Sort the URL's query string parameters.""" 

462 url = str(url) # Coerce reverse_lazy() URLs. 

463 scheme, netloc, path, params, query, fragment = urlparse(url) 

464 query_parts = sorted(parse_qsl(query)) 

465 return urlunparse( 

466 (scheme, netloc, path, params, urlencode(query_parts), fragment) 

467 ) 

468 

469 self.assertEqual( 

470 normalize(url1), 

471 normalize(url2), 

472 msg_prefix + "Expected '%s' to equal '%s'." % (url1, url2), 

473 ) 

474 

475 def _assert_contains(self, response, text, status_code, msg_prefix, html): 

476 # If the response supports deferred rendering and hasn't been rendered 

477 # yet, then ensure that it does get rendered before proceeding further. 

478 if ( 

479 hasattr(response, "render") 

480 and callable(response.render) 

481 and not response.is_rendered 

482 ): 

483 response.render() 

484 

485 if msg_prefix: 

486 msg_prefix += ": " 

487 

488 self.assertEqual( 

489 response.status_code, 

490 status_code, 

491 msg_prefix + "Couldn't retrieve content: Response code was %d" 

492 " (expected %d)" % (response.status_code, status_code), 

493 ) 

494 

495 if response.streaming: 

496 content = b"".join(response.streaming_content) 

497 else: 

498 content = response.content 

499 if not isinstance(text, bytes) or html: 

500 text = str(text) 

501 content = content.decode(response.charset) 

502 text_repr = "'%s'" % text 

503 else: 

504 text_repr = repr(text) 

505 if html: 

506 content = assert_and_parse_html( 

507 self, content, None, "Response's content is not valid HTML:" 

508 ) 

509 text = assert_and_parse_html( 

510 self, text, None, "Second argument is not valid HTML:" 

511 ) 

512 real_count = content.count(text) 

513 return (text_repr, real_count, msg_prefix) 

514 

515 def assertContains( 

516 self, response, text, count=None, status_code=200, msg_prefix="", html=False 

517 ): 

518 """ 

519 Assert that a response indicates that some content was retrieved 

520 successfully, (i.e., the HTTP status code was as expected) and that 

521 ``text`` occurs ``count`` times in the content of the response. 

522 If ``count`` is None, the count doesn't matter - the assertion is true 

523 if the text occurs at least once in the response. 

524 """ 

525 text_repr, real_count, msg_prefix = self._assert_contains( 

526 response, text, status_code, msg_prefix, html 

527 ) 

528 

529 if count is not None: 

530 self.assertEqual( 

531 real_count, 

532 count, 

533 msg_prefix 

534 + "Found %d instances of %s in response (expected %d)" 

535 % (real_count, text_repr, count), 

536 ) 

537 else: 

538 self.assertTrue( 

539 real_count != 0, msg_prefix + "Couldn't find %s in response" % text_repr 

540 ) 

541 

542 def assertNotContains( 

543 self, response, text, status_code=200, msg_prefix="", html=False 

544 ): 

545 """ 

546 Assert that a response indicates that some content was retrieved 

547 successfully, (i.e., the HTTP status code was as expected) and that 

548 ``text`` doesn't occur in the content of the response. 

549 """ 

550 text_repr, real_count, msg_prefix = self._assert_contains( 

551 response, text, status_code, msg_prefix, html 

552 ) 

553 

554 self.assertEqual( 

555 real_count, 0, msg_prefix + "Response should not contain %s" % text_repr 

556 ) 

557 

558 def assertFormError(self, response, form, field, errors, msg_prefix=""): 

559 """ 

560 Assert that a form used to render the response has a specific field 

561 error. 

562 """ 

563 if msg_prefix: 

564 msg_prefix += ": " 

565 

566 # Put context(s) into a list to simplify processing. 

567 contexts = to_list(response.context) 

568 if not contexts: 

569 self.fail( 

570 msg_prefix + "Response did not use any contexts to render the response" 

571 ) 

572 

573 # Put error(s) into a list to simplify processing. 

574 errors = to_list(errors) 

575 

576 # Search all contexts for the error. 

577 found_form = False 

578 for i, context in enumerate(contexts): 

579 if form not in context: 

580 continue 

581 found_form = True 

582 for err in errors: 

583 if field: 

584 if field in context[form].errors: 

585 field_errors = context[form].errors[field] 

586 self.assertTrue( 

587 err in field_errors, 

588 msg_prefix + "The field '%s' on form '%s' in" 

589 " context %d does not contain the error '%s'" 

590 " (actual errors: %s)" 

591 % (field, form, i, err, repr(field_errors)), 

592 ) 

593 elif field in context[form].fields: 

594 self.fail( 

595 msg_prefix 

596 + ( 

597 "The field '%s' on form '%s' in context %d contains no " 

598 "errors" 

599 ) 

600 % (field, form, i) 

601 ) 

602 else: 

603 self.fail( 

604 msg_prefix 

605 + ( 

606 "The form '%s' in context %d does not contain the " 

607 "field '%s'" 

608 ) 

609 % (form, i, field) 

610 ) 

611 else: 

612 non_field_errors = context[form].non_field_errors() 

613 self.assertTrue( 

614 err in non_field_errors, 

615 msg_prefix + "The form '%s' in context %d does not" 

616 " contain the non-field error '%s'" 

617 " (actual errors: %s)" 

618 % (form, i, err, non_field_errors or "none"), 

619 ) 

620 if not found_form: 

621 self.fail( 

622 msg_prefix + "The form '%s' was not used to render the response" % form 

623 ) 

624 

625 def assertFormsetError( 

626 self, response, formset, form_index, field, errors, msg_prefix="" 

627 ): 

628 """ 

629 Assert that a formset used to render the response has a specific error. 

630 

631 For field errors, specify the ``form_index`` and the ``field``. 

632 For non-field errors, specify the ``form_index`` and the ``field`` as 

633 None. 

634 For non-form errors, specify ``form_index`` as None and the ``field`` 

635 as None. 

636 """ 

637 # Add punctuation to msg_prefix 

638 if msg_prefix: 

639 msg_prefix += ": " 

640 

641 # Put context(s) into a list to simplify processing. 

642 contexts = to_list(response.context) 

643 if not contexts: 

644 self.fail( 

645 msg_prefix + "Response did not use any contexts to " 

646 "render the response" 

647 ) 

648 

649 # Put error(s) into a list to simplify processing. 

650 errors = to_list(errors) 

651 

652 # Search all contexts for the error. 

653 found_formset = False 

654 for i, context in enumerate(contexts): 

655 if formset not in context or not hasattr(context[formset], "forms"): 

656 continue 

657 found_formset = True 

658 for err in errors: 

659 if field is not None: 

660 if field in context[formset].forms[form_index].errors: 

661 field_errors = context[formset].forms[form_index].errors[field] 

662 self.assertTrue( 

663 err in field_errors, 

664 msg_prefix + "The field '%s' on formset '%s', " 

665 "form %d in context %d does not contain the " 

666 "error '%s' (actual errors: %s)" 

667 % (field, formset, form_index, i, err, repr(field_errors)), 

668 ) 

669 elif field in context[formset].forms[form_index].fields: 

670 self.fail( 

671 msg_prefix 

672 + ( 

673 "The field '%s' on formset '%s', form %d in context " 

674 "%d contains no errors" 

675 ) 

676 % (field, formset, form_index, i) 

677 ) 

678 else: 

679 self.fail( 

680 msg_prefix 

681 + ( 

682 "The formset '%s', form %d in context %d does not " 

683 "contain the field '%s'" 

684 ) 

685 % (formset, form_index, i, field) 

686 ) 

687 elif form_index is not None: 

688 non_field_errors = ( 

689 context[formset].forms[form_index].non_field_errors() 

690 ) 

691 self.assertFalse( 

692 not non_field_errors, 

693 msg_prefix + "The formset '%s', form %d in context %d " 

694 "does not contain any non-field errors." 

695 % (formset, form_index, i), 

696 ) 

697 self.assertTrue( 

698 err in non_field_errors, 

699 msg_prefix + "The formset '%s', form %d in context %d " 

700 "does not contain the non-field error '%s' (actual errors: %s)" 

701 % (formset, form_index, i, err, repr(non_field_errors)), 

702 ) 

703 else: 

704 non_form_errors = context[formset].non_form_errors() 

705 self.assertFalse( 

706 not non_form_errors, 

707 msg_prefix + "The formset '%s' in context %d does not " 

708 "contain any non-form errors." % (formset, i), 

709 ) 

710 self.assertTrue( 

711 err in non_form_errors, 

712 msg_prefix + "The formset '%s' in context %d does not " 

713 "contain the non-form error '%s' (actual errors: %s)" 

714 % (formset, i, err, repr(non_form_errors)), 

715 ) 

716 if not found_formset: 

717 self.fail( 

718 msg_prefix 

719 + "The formset '%s' was not used to render the response" % formset 

720 ) 

721 

722 def _assert_template_used(self, response, template_name, msg_prefix): 

723 

724 if response is None and template_name is None: 

725 raise TypeError("response and/or template_name argument must be provided") 

726 

727 if msg_prefix: 

728 msg_prefix += ": " 

729 

730 if ( 

731 template_name is not None 

732 and response is not None 

733 and not hasattr(response, "templates") 

734 ): 

735 raise ValueError( 

736 "assertTemplateUsed() and assertTemplateNotUsed() are only " 

737 "usable on responses fetched using the Django test Client." 

738 ) 

739 

740 if not hasattr(response, "templates") or (response is None and template_name): 

741 if response: 

742 template_name = response 

743 response = None 

744 # use this template with context manager 

745 return template_name, None, msg_prefix 

746 

747 template_names = [t.name for t in response.templates if t.name is not None] 

748 return None, template_names, msg_prefix 

749 

750 def assertTemplateUsed( 

751 self, response=None, template_name=None, msg_prefix="", count=None 

752 ): 

753 """ 

754 Assert that the template with the provided name was used in rendering 

755 the response. Also usable as context manager. 

756 """ 

757 context_mgr_template, template_names, msg_prefix = self._assert_template_used( 

758 response, template_name, msg_prefix 

759 ) 

760 

761 if context_mgr_template: 

762 # Use assertTemplateUsed as context manager. 

763 return _AssertTemplateUsedContext(self, context_mgr_template) 

764 

765 if not template_names: 

766 self.fail(msg_prefix + "No templates used to render the response") 

767 self.assertTrue( 

768 template_name in template_names, 

769 msg_prefix + "Template '%s' was not a template used to render" 

770 " the response. Actual template(s) used: %s" 

771 % (template_name, ", ".join(template_names)), 

772 ) 

773 

774 if count is not None: 

775 self.assertEqual( 

776 template_names.count(template_name), 

777 count, 

778 msg_prefix + "Template '%s' was expected to be rendered %d " 

779 "time(s) but was actually rendered %d time(s)." 

780 % (template_name, count, template_names.count(template_name)), 

781 ) 

782 

783 def assertTemplateNotUsed(self, response=None, template_name=None, msg_prefix=""): 

784 """ 

785 Assert that the template with the provided name was NOT used in 

786 rendering the response. Also usable as context manager. 

787 """ 

788 context_mgr_template, template_names, msg_prefix = self._assert_template_used( 

789 response, template_name, msg_prefix 

790 ) 

791 if context_mgr_template: 

792 # Use assertTemplateNotUsed as context manager. 

793 return _AssertTemplateNotUsedContext(self, context_mgr_template) 

794 

795 self.assertFalse( 

796 template_name in template_names, 

797 msg_prefix 

798 + "Template '%s' was used unexpectedly in rendering the response" 

799 % template_name, 

800 ) 

801 

802 @contextmanager 

803 def _assert_raises_or_warns_cm( 

804 self, func, cm_attr, expected_exception, expected_message 

805 ): 

806 with func(expected_exception) as cm: 

807 yield cm 

808 self.assertIn(expected_message, str(getattr(cm, cm_attr))) 

809 

810 def _assertFooMessage( 

811 self, func, cm_attr, expected_exception, expected_message, *args, **kwargs 

812 ): 

813 callable_obj = None 

814 if args: 

815 callable_obj, *args = args 

816 cm = self._assert_raises_or_warns_cm( 

817 func, cm_attr, expected_exception, expected_message 

818 ) 

819 # Assertion used in context manager fashion. 

820 if callable_obj is None: 

821 return cm 

822 # Assertion was passed a callable. 

823 with cm: 

824 callable_obj(*args, **kwargs) 

825 

826 def assertRaisesMessage( 

827 self, expected_exception, expected_message, *args, **kwargs 

828 ): 

829 """ 

830 Assert that expected_message is found in the message of a raised 

831 exception. 

832 

833 Args: 

834 expected_exception: Exception class expected to be raised. 

835 expected_message: expected error message string value. 

836 args: Function to be called and extra positional args. 

837 kwargs: Extra kwargs. 

838 """ 

839 return self._assertFooMessage( 

840 self.assertRaises, 

841 "exception", 

842 expected_exception, 

843 expected_message, 

844 *args, 

845 **kwargs, 

846 ) 

847 

848 def assertWarnsMessage(self, expected_warning, expected_message, *args, **kwargs): 

849 """ 

850 Same as assertRaisesMessage but for assertWarns() instead of 

851 assertRaises(). 

852 """ 

853 return self._assertFooMessage( 

854 self.assertWarns, 

855 "warning", 

856 expected_warning, 

857 expected_message, 

858 *args, 

859 **kwargs, 

860 ) 

861 

862 # A similar method is available in Python 3.10+. 

863 if not PY310: 863 ↛ 888line 863 didn't jump to line 888, because the condition on line 863 was never false

864 

865 @contextmanager 

866 def assertNoLogs(self, logger, level=None): 

867 """ 

868 Assert no messages are logged on the logger, with at least the 

869 given level. 

870 """ 

871 if isinstance(level, int): 

872 level = logging.getLevelName(level) 

873 elif level is None: 

874 level = "INFO" 

875 try: 

876 with self.assertLogs(logger, level) as cm: 

877 yield 

878 except AssertionError as e: 

879 msg = e.args[0] 

880 expected_msg = ( 

881 f"no logs of level {level} or higher triggered on {logger}" 

882 ) 

883 if msg != expected_msg: 

884 raise e 

885 else: 

886 self.fail(f"Unexpected logs found: {cm.output!r}") 

887 

888 def assertFieldOutput( 

889 self, 

890 fieldclass, 

891 valid, 

892 invalid, 

893 field_args=None, 

894 field_kwargs=None, 

895 empty_value="", 

896 ): 

897 """ 

898 Assert that a form field behaves correctly with various inputs. 

899 

900 Args: 

901 fieldclass: the class of the field to be tested. 

902 valid: a dictionary mapping valid inputs to their expected 

903 cleaned values. 

904 invalid: a dictionary mapping invalid inputs to one or more 

905 raised error messages. 

906 field_args: the args passed to instantiate the field 

907 field_kwargs: the kwargs passed to instantiate the field 

908 empty_value: the expected clean output for inputs in empty_values 

909 """ 

910 if field_args is None: 

911 field_args = [] 

912 if field_kwargs is None: 

913 field_kwargs = {} 

914 required = fieldclass(*field_args, **field_kwargs) 

915 optional = fieldclass(*field_args, **{**field_kwargs, "required": False}) 

916 # test valid inputs 

917 for input, output in valid.items(): 

918 self.assertEqual(required.clean(input), output) 

919 self.assertEqual(optional.clean(input), output) 

920 # test invalid inputs 

921 for input, errors in invalid.items(): 

922 with self.assertRaises(ValidationError) as context_manager: 

923 required.clean(input) 

924 self.assertEqual(context_manager.exception.messages, errors) 

925 

926 with self.assertRaises(ValidationError) as context_manager: 

927 optional.clean(input) 

928 self.assertEqual(context_manager.exception.messages, errors) 

929 # test required inputs 

930 error_required = [required.error_messages["required"]] 

931 for e in required.empty_values: 

932 with self.assertRaises(ValidationError) as context_manager: 

933 required.clean(e) 

934 self.assertEqual(context_manager.exception.messages, error_required) 

935 self.assertEqual(optional.clean(e), empty_value) 

936 # test that max_length and min_length are always accepted 

937 if issubclass(fieldclass, CharField): 

938 field_kwargs.update({"min_length": 2, "max_length": 20}) 

939 self.assertIsInstance(fieldclass(*field_args, **field_kwargs), fieldclass) 

940 

941 def assertHTMLEqual(self, html1, html2, msg=None): 

942 """ 

943 Assert that two HTML snippets are semantically the same. 

944 Whitespace in most cases is ignored, and attribute ordering is not 

945 significant. The arguments must be valid HTML. 

946 """ 

947 dom1 = assert_and_parse_html( 

948 self, html1, msg, "First argument is not valid HTML:" 

949 ) 

950 dom2 = assert_and_parse_html( 

951 self, html2, msg, "Second argument is not valid HTML:" 

952 ) 

953 

954 if dom1 != dom2: 

955 standardMsg = "%s != %s" % (safe_repr(dom1, True), safe_repr(dom2, True)) 

956 diff = "\n" + "\n".join( 

957 difflib.ndiff( 

958 str(dom1).splitlines(), 

959 str(dom2).splitlines(), 

960 ) 

961 ) 

962 standardMsg = self._truncateMessage(standardMsg, diff) 

963 self.fail(self._formatMessage(msg, standardMsg)) 

964 

965 def assertHTMLNotEqual(self, html1, html2, msg=None): 

966 """Assert that two HTML snippets are not semantically equivalent.""" 

967 dom1 = assert_and_parse_html( 

968 self, html1, msg, "First argument is not valid HTML:" 

969 ) 

970 dom2 = assert_and_parse_html( 

971 self, html2, msg, "Second argument is not valid HTML:" 

972 ) 

973 

974 if dom1 == dom2: 

975 standardMsg = "%s == %s" % (safe_repr(dom1, True), safe_repr(dom2, True)) 

976 self.fail(self._formatMessage(msg, standardMsg)) 

977 

978 def assertInHTML(self, needle, haystack, count=None, msg_prefix=""): 

979 needle = assert_and_parse_html( 

980 self, needle, None, "First argument is not valid HTML:" 

981 ) 

982 haystack = assert_and_parse_html( 

983 self, haystack, None, "Second argument is not valid HTML:" 

984 ) 

985 real_count = haystack.count(needle) 

986 if count is not None: 

987 self.assertEqual( 

988 real_count, 

989 count, 

990 msg_prefix 

991 + "Found %d instances of '%s' in response (expected %d)" 

992 % (real_count, needle, count), 

993 ) 

994 else: 

995 self.assertTrue( 

996 real_count != 0, msg_prefix + "Couldn't find '%s' in response" % needle 

997 ) 

998 

999 def assertJSONEqual(self, raw, expected_data, msg=None): 

1000 """ 

1001 Assert that the JSON fragments raw and expected_data are equal. 

1002 Usual JSON non-significant whitespace rules apply as the heavyweight 

1003 is delegated to the json library. 

1004 """ 

1005 try: 

1006 data = json.loads(raw) 

1007 except json.JSONDecodeError: 

1008 self.fail("First argument is not valid JSON: %r" % raw) 

1009 if isinstance(expected_data, str): 

1010 try: 

1011 expected_data = json.loads(expected_data) 

1012 except ValueError: 

1013 self.fail("Second argument is not valid JSON: %r" % expected_data) 

1014 self.assertEqual(data, expected_data, msg=msg) 

1015 

1016 def assertJSONNotEqual(self, raw, expected_data, msg=None): 

1017 """ 

1018 Assert that the JSON fragments raw and expected_data are not equal. 

1019 Usual JSON non-significant whitespace rules apply as the heavyweight 

1020 is delegated to the json library. 

1021 """ 

1022 try: 

1023 data = json.loads(raw) 

1024 except json.JSONDecodeError: 

1025 self.fail("First argument is not valid JSON: %r" % raw) 

1026 if isinstance(expected_data, str): 

1027 try: 

1028 expected_data = json.loads(expected_data) 

1029 except json.JSONDecodeError: 

1030 self.fail("Second argument is not valid JSON: %r" % expected_data) 

1031 self.assertNotEqual(data, expected_data, msg=msg) 

1032 

1033 def assertXMLEqual(self, xml1, xml2, msg=None): 

1034 """ 

1035 Assert that two XML snippets are semantically the same. 

1036 Whitespace in most cases is ignored and attribute ordering is not 

1037 significant. The arguments must be valid XML. 

1038 """ 

1039 try: 

1040 result = compare_xml(xml1, xml2) 

1041 except Exception as e: 

1042 standardMsg = "First or second argument is not valid XML\n%s" % e 

1043 self.fail(self._formatMessage(msg, standardMsg)) 

1044 else: 

1045 if not result: 

1046 standardMsg = "%s != %s" % ( 

1047 safe_repr(xml1, True), 

1048 safe_repr(xml2, True), 

1049 ) 

1050 diff = "\n" + "\n".join( 

1051 difflib.ndiff(xml1.splitlines(), xml2.splitlines()) 

1052 ) 

1053 standardMsg = self._truncateMessage(standardMsg, diff) 

1054 self.fail(self._formatMessage(msg, standardMsg)) 

1055 

1056 def assertXMLNotEqual(self, xml1, xml2, msg=None): 

1057 """ 

1058 Assert that two XML snippets are not semantically equivalent. 

1059 Whitespace in most cases is ignored and attribute ordering is not 

1060 significant. The arguments must be valid XML. 

1061 """ 

1062 try: 

1063 result = compare_xml(xml1, xml2) 

1064 except Exception as e: 

1065 standardMsg = "First or second argument is not valid XML\n%s" % e 

1066 self.fail(self._formatMessage(msg, standardMsg)) 

1067 else: 

1068 if result: 

1069 standardMsg = "%s == %s" % ( 

1070 safe_repr(xml1, True), 

1071 safe_repr(xml2, True), 

1072 ) 

1073 self.fail(self._formatMessage(msg, standardMsg)) 

1074 

1075 

1076class TransactionTestCase(SimpleTestCase): 

1077 

1078 # Subclasses can ask for resetting of auto increment sequence before each 

1079 # test case 

1080 reset_sequences = False 

1081 

1082 # Subclasses can enable only a subset of apps for faster tests 

1083 available_apps = None 

1084 

1085 # Subclasses can define fixtures which will be automatically installed. 

1086 fixtures = None 

1087 

1088 databases = {DEFAULT_DB_ALIAS} 

1089 _disallowed_database_msg = ( 

1090 "Database %(operation)s to %(alias)r are not allowed in this test. " 

1091 "Add %(alias)r to %(test)s.databases to ensure proper test isolation " 

1092 "and silence this failure." 

1093 ) 

1094 

1095 # If transactions aren't available, Django will serialize the database 

1096 # contents into a fixture during setup and flush and reload them 

1097 # during teardown (as flush does not restore data from migrations). 

1098 # This can be slow; this flag allows enabling on a per-case basis. 

1099 serialized_rollback = False 

1100 

1101 def _pre_setup(self): 

1102 """ 

1103 Perform pre-test setup: 

1104 * If the class has an 'available_apps' attribute, restrict the app 

1105 registry to these applications, then fire the post_migrate signal -- 

1106 it must run with the correct set of applications for the test case. 

1107 * If the class has a 'fixtures' attribute, install those fixtures. 

1108 """ 

1109 super()._pre_setup() 

1110 if self.available_apps is not None: 1110 ↛ 1111line 1110 didn't jump to line 1111, because the condition on line 1110 was never true

1111 apps.set_available_apps(self.available_apps) 

1112 setting_changed.send( 

1113 sender=settings._wrapped.__class__, 

1114 setting="INSTALLED_APPS", 

1115 value=self.available_apps, 

1116 enter=True, 

1117 ) 

1118 for db_name in self._databases_names(include_mirrors=False): 

1119 emit_post_migrate_signal(verbosity=0, interactive=False, db=db_name) 

1120 try: 

1121 self._fixture_setup() 

1122 except Exception: 

1123 if self.available_apps is not None: 

1124 apps.unset_available_apps() 

1125 setting_changed.send( 

1126 sender=settings._wrapped.__class__, 

1127 setting="INSTALLED_APPS", 

1128 value=settings.INSTALLED_APPS, 

1129 enter=False, 

1130 ) 

1131 raise 

1132 # Clear the queries_log so that it's less likely to overflow (a single 

1133 # test probably won't execute 9K queries). If queries_log overflows, 

1134 # then assertNumQueries() doesn't work. 

1135 for db_name in self._databases_names(include_mirrors=False): 

1136 connections[db_name].queries_log.clear() 

1137 

1138 @classmethod 

1139 def _databases_names(cls, include_mirrors=True): 

1140 # Only consider allowed database aliases, including mirrors or not. 

1141 return [ 

1142 alias 

1143 for alias in connections 

1144 if alias in cls.databases 

1145 and ( 

1146 include_mirrors 

1147 or not connections[alias].settings_dict["TEST"]["MIRROR"] 

1148 ) 

1149 ] 

1150 

1151 def _reset_sequences(self, db_name): 

1152 conn = connections[db_name] 

1153 if conn.features.supports_sequence_reset: 

1154 sql_list = conn.ops.sequence_reset_by_name_sql( 

1155 no_style(), conn.introspection.sequence_list() 

1156 ) 

1157 if sql_list: 

1158 with transaction.atomic(using=db_name): 

1159 with conn.cursor() as cursor: 

1160 for sql in sql_list: 

1161 cursor.execute(sql) 

1162 

1163 def _fixture_setup(self): 

1164 for db_name in self._databases_names(include_mirrors=False): 

1165 # Reset sequences 

1166 if self.reset_sequences: 

1167 self._reset_sequences(db_name) 

1168 

1169 # Provide replica initial data from migrated apps, if needed. 

1170 if self.serialized_rollback and hasattr( 

1171 connections[db_name], "_test_serialized_contents" 

1172 ): 

1173 if self.available_apps is not None: 

1174 apps.unset_available_apps() 

1175 connections[db_name].creation.deserialize_db_from_string( 

1176 connections[db_name]._test_serialized_contents 

1177 ) 

1178 if self.available_apps is not None: 

1179 apps.set_available_apps(self.available_apps) 

1180 

1181 if self.fixtures: 

1182 # We have to use this slightly awkward syntax due to the fact 

1183 # that we're using *args and **kwargs together. 

1184 call_command( 

1185 "loaddata", *self.fixtures, **{"verbosity": 0, "database": db_name} 

1186 ) 

1187 

1188 def _should_reload_connections(self): 

1189 return True 

1190 

1191 def _post_teardown(self): 

1192 """ 

1193 Perform post-test things: 

1194 * Flush the contents of the database to leave a clean slate. If the 

1195 class has an 'available_apps' attribute, don't fire post_migrate. 

1196 * Force-close the connection so the next test gets a clean cursor. 

1197 """ 

1198 try: 

1199 self._fixture_teardown() 

1200 super()._post_teardown() 

1201 if self._should_reload_connections(): 1201 ↛ 1208line 1201 didn't jump to line 1208, because the condition on line 1201 was never true

1202 # Some DB cursors include SQL statements as part of cursor 

1203 # creation. If you have a test that does a rollback, the effect 

1204 # of these statements is lost, which can affect the operation of 

1205 # tests (e.g., losing a timezone setting causing objects to be 

1206 # created with the wrong time). To make sure this doesn't 

1207 # happen, get a clean connection at the start of every test. 

1208 for conn in connections.all(): 

1209 conn.close() 

1210 finally: 

1211 if self.available_apps is not None: 1211 ↛ 1212line 1211 didn't jump to line 1212, because the condition on line 1211 was never true

1212 apps.unset_available_apps() 

1213 setting_changed.send( 

1214 sender=settings._wrapped.__class__, 

1215 setting="INSTALLED_APPS", 

1216 value=settings.INSTALLED_APPS, 

1217 enter=False, 

1218 ) 

1219 

1220 def _fixture_teardown(self): 

1221 # Allow TRUNCATE ... CASCADE and don't emit the post_migrate signal 

1222 # when flushing only a subset of the apps 

1223 for db_name in self._databases_names(include_mirrors=False): 

1224 # Flush the database 

1225 inhibit_post_migrate = ( 

1226 self.available_apps is not None 

1227 or ( # Inhibit the post_migrate signal when using serialized 

1228 # rollback to avoid trying to recreate the serialized data. 

1229 self.serialized_rollback 

1230 and hasattr(connections[db_name], "_test_serialized_contents") 

1231 ) 

1232 ) 

1233 call_command( 

1234 "flush", 

1235 verbosity=0, 

1236 interactive=False, 

1237 database=db_name, 

1238 reset_sequences=False, 

1239 allow_cascade=self.available_apps is not None, 

1240 inhibit_post_migrate=inhibit_post_migrate, 

1241 ) 

1242 

1243 def assertQuerysetEqual(self, qs, values, transform=None, ordered=True, msg=None): 

1244 values = list(values) 

1245 # RemovedInDjango41Warning. 

1246 if transform is None: 

1247 if ( 

1248 values 

1249 and isinstance(values[0], str) 

1250 and qs 

1251 and not isinstance(qs[0], str) 

1252 ): 

1253 # Transform qs using repr() if the first element of values is a 

1254 # string and the first element of qs is not (which would be the 

1255 # case if qs is a flattened values_list). 

1256 warnings.warn( 

1257 "In Django 4.1, repr() will not be called automatically " 

1258 "on a queryset when compared to string values. Set an " 

1259 "explicit 'transform' to silence this warning.", 

1260 category=RemovedInDjango41Warning, 

1261 stacklevel=2, 

1262 ) 

1263 transform = repr 

1264 items = qs 

1265 if transform is not None: 

1266 items = map(transform, items) 

1267 if not ordered: 

1268 return self.assertDictEqual(Counter(items), Counter(values), msg=msg) 

1269 # For example qs.iterator() could be passed as qs, but it does not 

1270 # have 'ordered' attribute. 

1271 if len(values) > 1 and hasattr(qs, "ordered") and not qs.ordered: 

1272 raise ValueError( 

1273 "Trying to compare non-ordered queryset against more than one " 

1274 "ordered value." 

1275 ) 

1276 return self.assertEqual(list(items), values, msg=msg) 

1277 

1278 def assertNumQueries(self, num, func=None, *args, using=DEFAULT_DB_ALIAS, **kwargs): 

1279 conn = connections[using] 

1280 

1281 context = _AssertNumQueriesContext(self, num, conn) 

1282 if func is None: 

1283 return context 

1284 

1285 with context: 

1286 func(*args, **kwargs) 

1287 

1288 

1289def connections_support_transactions(aliases=None): 

1290 """ 

1291 Return whether or not all (or specified) connections support 

1292 transactions. 

1293 """ 

1294 conns = ( 

1295 connections.all() 

1296 if aliases is None 

1297 else (connections[alias] for alias in aliases) 

1298 ) 

1299 return all(conn.features.supports_transactions for conn in conns) 

1300 

1301 

1302class TestData: 

1303 """ 

1304 Descriptor to provide TestCase instance isolation for attributes assigned 

1305 during the setUpTestData() phase. 

1306 

1307 Allow safe alteration of objects assigned in setUpTestData() by test 

1308 methods by exposing deep copies instead of the original objects. 

1309 

1310 Objects are deep copied using a memo kept on the test case instance in 

1311 order to maintain their original relationships. 

1312 """ 

1313 

1314 memo_attr = "_testdata_memo" 

1315 

1316 def __init__(self, name, data): 

1317 self.name = name 

1318 self.data = data 

1319 

1320 def get_memo(self, testcase): 

1321 try: 

1322 memo = getattr(testcase, self.memo_attr) 

1323 except AttributeError: 

1324 memo = {} 

1325 setattr(testcase, self.memo_attr, memo) 

1326 return memo 

1327 

1328 def __get__(self, instance, owner): 

1329 if instance is None: 1329 ↛ 1330line 1329 didn't jump to line 1330, because the condition on line 1329 was never true

1330 return self.data 

1331 memo = self.get_memo(instance) 

1332 try: 

1333 data = deepcopy(self.data, memo) 

1334 except TypeError: 

1335 # RemovedInDjango41Warning. 

1336 msg = ( 

1337 "Assigning objects which don't support copy.deepcopy() during " 

1338 "setUpTestData() is deprecated. Either assign the %s " 

1339 "attribute during setUpClass() or setUp(), or add support for " 

1340 "deepcopy() to %s.%s.%s." 

1341 ) % ( 

1342 self.name, 

1343 owner.__module__, 

1344 owner.__qualname__, 

1345 self.name, 

1346 ) 

1347 warnings.warn(msg, category=RemovedInDjango41Warning, stacklevel=2) 

1348 data = self.data 

1349 setattr(instance, self.name, data) 

1350 return data 

1351 

1352 def __repr__(self): 

1353 return "<TestData: name=%r, data=%r>" % (self.name, self.data) 

1354 

1355 

1356class TestCase(TransactionTestCase): 

1357 """ 

1358 Similar to TransactionTestCase, but use `transaction.atomic()` to achieve 

1359 test isolation. 

1360 

1361 In most situations, TestCase should be preferred to TransactionTestCase as 

1362 it allows faster execution. However, there are some situations where using 

1363 TransactionTestCase might be necessary (e.g. testing some transactional 

1364 behavior). 

1365 

1366 On database backends with no transaction support, TestCase behaves as 

1367 TransactionTestCase. 

1368 """ 

1369 

1370 @classmethod 

1371 def _enter_atomics(cls): 

1372 """Open atomic blocks for multiple databases.""" 

1373 atomics = {} 

1374 for db_name in cls._databases_names(): 

1375 atomics[db_name] = transaction.atomic(using=db_name) 

1376 atomics[db_name].__enter__() 

1377 return atomics 

1378 

1379 @classmethod 

1380 def _rollback_atomics(cls, atomics): 

1381 """Rollback atomic blocks opened by the previous method.""" 

1382 for db_name in reversed(cls._databases_names()): 

1383 transaction.set_rollback(True, using=db_name) 

1384 atomics[db_name].__exit__(None, None, None) 

1385 

1386 @classmethod 

1387 def _databases_support_transactions(cls): 

1388 return connections_support_transactions(cls.databases) 

1389 

1390 @classmethod 

1391 def setUpClass(cls): 

1392 super().setUpClass() 

1393 if not cls._databases_support_transactions(): 1393 ↛ 1394line 1393 didn't jump to line 1394, because the condition on line 1393 was never true

1394 return 

1395 # Disable the durability check to allow testing durable atomic blocks 

1396 # in a transaction for performance reasons. 

1397 transaction.Atomic._ensure_durability = False 

1398 try: 

1399 cls.cls_atomics = cls._enter_atomics() 

1400 

1401 if cls.fixtures: 1401 ↛ 1402line 1401 didn't jump to line 1402, because the condition on line 1401 was never true

1402 for db_name in cls._databases_names(include_mirrors=False): 

1403 try: 

1404 call_command( 

1405 "loaddata", 

1406 *cls.fixtures, 

1407 **{"verbosity": 0, "database": db_name}, 

1408 ) 

1409 except Exception: 

1410 cls._rollback_atomics(cls.cls_atomics) 

1411 raise 

1412 pre_attrs = cls.__dict__.copy() 

1413 try: 

1414 cls.setUpTestData() 

1415 except Exception: 

1416 cls._rollback_atomics(cls.cls_atomics) 

1417 raise 

1418 for name, value in cls.__dict__.items(): 

1419 if value is not pre_attrs.get(name): 

1420 setattr(cls, name, TestData(name, value)) 

1421 except Exception: 

1422 transaction.Atomic._ensure_durability = True 

1423 raise 

1424 

1425 @classmethod 

1426 def tearDownClass(cls): 

1427 transaction.Atomic._ensure_durability = True 

1428 if cls._databases_support_transactions(): 1428 ↛ 1432line 1428 didn't jump to line 1432, because the condition on line 1428 was never false

1429 cls._rollback_atomics(cls.cls_atomics) 

1430 for conn in connections.all(): 

1431 conn.close() 

1432 super().tearDownClass() 

1433 

1434 @classmethod 

1435 def setUpTestData(cls): 

1436 """Load initial data for the TestCase.""" 

1437 pass 

1438 

1439 def _should_reload_connections(self): 

1440 if self._databases_support_transactions(): 1440 ↛ 1442line 1440 didn't jump to line 1442, because the condition on line 1440 was never false

1441 return False 

1442 return super()._should_reload_connections() 

1443 

1444 def _fixture_setup(self): 

1445 if not self._databases_support_transactions(): 1445 ↛ 1448line 1445 didn't jump to line 1448, because the condition on line 1445 was never true

1446 # If the backend does not support transactions, we should reload 

1447 # class data before each test 

1448 self.setUpTestData() 

1449 return super()._fixture_setup() 

1450 

1451 if self.reset_sequences: 1451 ↛ 1452line 1451 didn't jump to line 1452, because the condition on line 1451 was never true

1452 raise TypeError("reset_sequences cannot be used on TestCase instances") 

1453 self.atomics = self._enter_atomics() 

1454 

1455 def _fixture_teardown(self): 

1456 if not self._databases_support_transactions(): 1456 ↛ 1457line 1456 didn't jump to line 1457, because the condition on line 1456 was never true

1457 return super()._fixture_teardown() 

1458 try: 

1459 for db_name in reversed(self._databases_names()): 

1460 if self._should_check_constraints(connections[db_name]): 1460 ↛ 1459line 1460 didn't jump to line 1459, because the condition on line 1460 was never false

1461 connections[db_name].check_constraints() 

1462 finally: 

1463 self._rollback_atomics(self.atomics) 

1464 

1465 def _should_check_constraints(self, connection): 

1466 return ( 

1467 connection.features.can_defer_constraint_checks 

1468 and not connection.needs_rollback 

1469 and connection.is_usable() 

1470 ) 

1471 

1472 @classmethod 

1473 @contextmanager 

1474 def captureOnCommitCallbacks(cls, *, using=DEFAULT_DB_ALIAS, execute=False): 

1475 """Context manager to capture transaction.on_commit() callbacks.""" 

1476 callbacks = [] 

1477 start_count = len(connections[using].run_on_commit) 

1478 try: 

1479 yield callbacks 

1480 finally: 

1481 while True: 

1482 callback_count = len(connections[using].run_on_commit) 

1483 for _, callback in connections[using].run_on_commit[start_count:]: 

1484 callbacks.append(callback) 

1485 if execute: 

1486 callback() 

1487 

1488 if callback_count == len(connections[using].run_on_commit): 

1489 break 

1490 start_count = callback_count 

1491 

1492 

1493class CheckCondition: 

1494 """Descriptor class for deferred condition checking.""" 

1495 

1496 def __init__(self, *conditions): 

1497 self.conditions = conditions 

1498 

1499 def add_condition(self, condition, reason): 

1500 return self.__class__(*self.conditions, (condition, reason)) 

1501 

1502 def __get__(self, instance, cls=None): 

1503 # Trigger access for all bases. 

1504 if any(getattr(base, "__unittest_skip__", False) for base in cls.__bases__): 

1505 return True 

1506 for condition, reason in self.conditions: 

1507 if condition(): 

1508 # Override this descriptor's value and set the skip reason. 

1509 cls.__unittest_skip__ = True 

1510 cls.__unittest_skip_why__ = reason 

1511 return True 

1512 return False 

1513 

1514 

1515def _deferredSkip(condition, reason, name): 

1516 def decorator(test_func): 

1517 nonlocal condition 

1518 if not ( 

1519 isinstance(test_func, type) and issubclass(test_func, unittest.TestCase) 

1520 ): 

1521 

1522 @wraps(test_func) 

1523 def skip_wrapper(*args, **kwargs): 

1524 if ( 

1525 args 

1526 and isinstance(args[0], unittest.TestCase) 

1527 and connection.alias not in getattr(args[0], "databases", {}) 

1528 ): 

1529 raise ValueError( 

1530 "%s cannot be used on %s as %s doesn't allow queries " 

1531 "against the %r database." 

1532 % ( 

1533 name, 

1534 args[0], 

1535 args[0].__class__.__qualname__, 

1536 connection.alias, 

1537 ) 

1538 ) 

1539 if condition(): 

1540 raise unittest.SkipTest(reason) 

1541 return test_func(*args, **kwargs) 

1542 

1543 test_item = skip_wrapper 

1544 else: 

1545 # Assume a class is decorated 

1546 test_item = test_func 

1547 databases = getattr(test_item, "databases", None) 

1548 if not databases or connection.alias not in databases: 

1549 # Defer raising to allow importing test class's module. 

1550 def condition(): 

1551 raise ValueError( 

1552 "%s cannot be used on %s as it doesn't allow queries " 

1553 "against the '%s' database." 

1554 % ( 

1555 name, 

1556 test_item, 

1557 connection.alias, 

1558 ) 

1559 ) 

1560 

1561 # Retrieve the possibly existing value from the class's dict to 

1562 # avoid triggering the descriptor. 

1563 skip = test_func.__dict__.get("__unittest_skip__") 

1564 if isinstance(skip, CheckCondition): 

1565 test_item.__unittest_skip__ = skip.add_condition(condition, reason) 

1566 elif skip is not True: 

1567 test_item.__unittest_skip__ = CheckCondition((condition, reason)) 

1568 return test_item 

1569 

1570 return decorator 

1571 

1572 

1573def skipIfDBFeature(*features): 

1574 """Skip a test if a database has at least one of the named features.""" 

1575 return _deferredSkip( 

1576 lambda: any( 

1577 getattr(connection.features, feature, False) for feature in features 

1578 ), 

1579 "Database has feature(s) %s" % ", ".join(features), 

1580 "skipIfDBFeature", 

1581 ) 

1582 

1583 

1584def skipUnlessDBFeature(*features): 

1585 """Skip a test unless a database has all the named features.""" 

1586 return _deferredSkip( 

1587 lambda: not all( 

1588 getattr(connection.features, feature, False) for feature in features 

1589 ), 

1590 "Database doesn't support feature(s): %s" % ", ".join(features), 

1591 "skipUnlessDBFeature", 

1592 ) 

1593 

1594 

1595def skipUnlessAnyDBFeature(*features): 

1596 """Skip a test unless a database has any of the named features.""" 

1597 return _deferredSkip( 

1598 lambda: not any( 

1599 getattr(connection.features, feature, False) for feature in features 

1600 ), 

1601 "Database doesn't support any of the feature(s): %s" % ", ".join(features), 

1602 "skipUnlessAnyDBFeature", 

1603 ) 

1604 

1605 

1606class QuietWSGIRequestHandler(WSGIRequestHandler): 

1607 """ 

1608 A WSGIRequestHandler that doesn't log to standard output any of the 

1609 requests received, so as to not clutter the test result output. 

1610 """ 

1611 

1612 def log_message(*args): 

1613 pass 

1614 

1615 

1616class FSFilesHandler(WSGIHandler): 

1617 """ 

1618 WSGI middleware that intercepts calls to a directory, as defined by one of 

1619 the *_ROOT settings, and serves those files, publishing them under *_URL. 

1620 """ 

1621 

1622 def __init__(self, application): 

1623 self.application = application 

1624 self.base_url = urlparse(self.get_base_url()) 

1625 super().__init__() 

1626 

1627 def _should_handle(self, path): 

1628 """ 

1629 Check if the path should be handled. Ignore the path if: 

1630 * the host is provided as part of the base_url 

1631 * the request's path isn't under the media path (or equal) 

1632 """ 

1633 return path.startswith(self.base_url[2]) and not self.base_url[1] 

1634 

1635 def file_path(self, url): 

1636 """Return the relative path to the file on disk for the given URL.""" 

1637 relative_url = url[len(self.base_url[2]) :] 

1638 return url2pathname(relative_url) 

1639 

1640 def get_response(self, request): 

1641 from django.http import Http404 

1642 

1643 if self._should_handle(request.path): 

1644 try: 

1645 return self.serve(request) 

1646 except Http404: 

1647 pass 

1648 return super().get_response(request) 

1649 

1650 def serve(self, request): 

1651 os_rel_path = self.file_path(request.path) 

1652 os_rel_path = posixpath.normpath(unquote(os_rel_path)) 

1653 # Emulate behavior of django.contrib.staticfiles.views.serve() when it 

1654 # invokes staticfiles' finders functionality. 

1655 # TODO: Modify if/when that internal API is refactored 

1656 final_rel_path = os_rel_path.replace("\\", "/").lstrip("/") 

1657 return serve(request, final_rel_path, document_root=self.get_base_dir()) 

1658 

1659 def __call__(self, environ, start_response): 

1660 if not self._should_handle(get_path_info(environ)): 

1661 return self.application(environ, start_response) 

1662 return super().__call__(environ, start_response) 

1663 

1664 

1665class _StaticFilesHandler(FSFilesHandler): 

1666 """ 

1667 Handler for serving static files. A private class that is meant to be used 

1668 solely as a convenience by LiveServerThread. 

1669 """ 

1670 

1671 def get_base_dir(self): 

1672 return settings.STATIC_ROOT 

1673 

1674 def get_base_url(self): 

1675 return settings.STATIC_URL 

1676 

1677 

1678class _MediaFilesHandler(FSFilesHandler): 

1679 """ 

1680 Handler for serving the media files. A private class that is meant to be 

1681 used solely as a convenience by LiveServerThread. 

1682 """ 

1683 

1684 def get_base_dir(self): 

1685 return settings.MEDIA_ROOT 

1686 

1687 def get_base_url(self): 

1688 return settings.MEDIA_URL 

1689 

1690 

1691class LiveServerThread(threading.Thread): 

1692 """Thread for running a live HTTP server while the tests are running.""" 

1693 

1694 server_class = ThreadedWSGIServer 

1695 

1696 def __init__(self, host, static_handler, connections_override=None, port=0): 

1697 self.host = host 

1698 self.port = port 

1699 self.is_ready = threading.Event() 

1700 self.error = None 

1701 self.static_handler = static_handler 

1702 self.connections_override = connections_override 

1703 super().__init__() 

1704 

1705 def run(self): 

1706 """ 

1707 Set up the live server and databases, and then loop over handling 

1708 HTTP requests. 

1709 """ 

1710 if self.connections_override: 

1711 # Override this thread's database connections with the ones 

1712 # provided by the main thread. 

1713 for alias, conn in self.connections_override.items(): 

1714 connections[alias] = conn 

1715 try: 

1716 # Create the handler for serving static and media files 

1717 handler = self.static_handler(_MediaFilesHandler(WSGIHandler())) 

1718 self.httpd = self._create_server() 

1719 # If binding to port zero, assign the port allocated by the OS. 

1720 if self.port == 0: 

1721 self.port = self.httpd.server_address[1] 

1722 self.httpd.set_app(handler) 

1723 self.is_ready.set() 

1724 self.httpd.serve_forever() 

1725 except Exception as e: 

1726 self.error = e 

1727 self.is_ready.set() 

1728 finally: 

1729 connections.close_all() 

1730 

1731 def _create_server(self, connections_override=None): 

1732 return self.server_class( 

1733 (self.host, self.port), 

1734 QuietWSGIRequestHandler, 

1735 allow_reuse_address=False, 

1736 connections_override=connections_override, 

1737 ) 

1738 

1739 def terminate(self): 

1740 if hasattr(self, "httpd"): 

1741 # Stop the WSGI server 

1742 self.httpd.shutdown() 

1743 self.httpd.server_close() 

1744 self.join() 

1745 

1746 

1747class LiveServerTestCase(TransactionTestCase): 

1748 """ 

1749 Do basically the same as TransactionTestCase but also launch a live HTTP 

1750 server in a separate thread so that the tests may use another testing 

1751 framework, such as Selenium for example, instead of the built-in dummy 

1752 client. 

1753 It inherits from TransactionTestCase instead of TestCase because the 

1754 threads don't share the same transactions (unless if using in-memory sqlite) 

1755 and each thread needs to commit all their transactions so that the other 

1756 thread can see the changes. 

1757 """ 

1758 

1759 host = "localhost" 

1760 port = 0 

1761 server_thread_class = LiveServerThread 

1762 static_handler = _StaticFilesHandler 

1763 

1764 @classproperty 

1765 def live_server_url(cls): 

1766 return "http://%s:%s" % (cls.host, cls.server_thread.port) 

1767 

1768 @classproperty 

1769 def allowed_host(cls): 

1770 return cls.host 

1771 

1772 @classmethod 

1773 def _make_connections_override(cls): 

1774 connections_override = {} 

1775 for conn in connections.all(): 

1776 # If using in-memory sqlite databases, pass the connections to 

1777 # the server thread. 

1778 if conn.vendor == "sqlite" and conn.is_in_memory_db(): 

1779 connections_override[conn.alias] = conn 

1780 return connections_override 

1781 

1782 @classmethod 

1783 def setUpClass(cls): 

1784 super().setUpClass() 

1785 cls._live_server_modified_settings = modify_settings( 

1786 ALLOWED_HOSTS={"append": cls.allowed_host}, 

1787 ) 

1788 cls._live_server_modified_settings.enable() 

1789 

1790 connections_override = cls._make_connections_override() 

1791 for conn in connections_override.values(): 

1792 # Explicitly enable thread-shareability for this connection. 

1793 conn.inc_thread_sharing() 

1794 

1795 cls.server_thread = cls._create_server_thread(connections_override) 

1796 cls.server_thread.daemon = True 

1797 cls.server_thread.start() 

1798 

1799 # Wait for the live server to be ready 

1800 cls.server_thread.is_ready.wait() 

1801 if cls.server_thread.error: 

1802 # Clean up behind ourselves, since tearDownClass won't get called in 

1803 # case of errors. 

1804 cls._tearDownClassInternal() 

1805 raise cls.server_thread.error 

1806 

1807 @classmethod 

1808 def _create_server_thread(cls, connections_override): 

1809 return cls.server_thread_class( 

1810 cls.host, 

1811 cls.static_handler, 

1812 connections_override=connections_override, 

1813 port=cls.port, 

1814 ) 

1815 

1816 @classmethod 

1817 def _tearDownClassInternal(cls): 

1818 # Terminate the live server's thread. 

1819 cls.server_thread.terminate() 

1820 # Restore shared connections' non-shareability. 

1821 for conn in cls.server_thread.connections_override.values(): 

1822 conn.dec_thread_sharing() 

1823 

1824 cls._live_server_modified_settings.disable() 

1825 super().tearDownClass() 

1826 

1827 @classmethod 

1828 def tearDownClass(cls): 

1829 cls._tearDownClassInternal() 

1830 

1831 

1832class SerializeMixin: 

1833 """ 

1834 Enforce serialization of TestCases that share a common resource. 

1835 

1836 Define a common 'lockfile' for each set of TestCases to serialize. This 

1837 file must exist on the filesystem. 

1838 

1839 Place it early in the MRO in order to isolate setUpClass()/tearDownClass(). 

1840 """ 

1841 

1842 lockfile = None 

1843 

1844 def __init_subclass__(cls, /, **kwargs): 

1845 super().__init_subclass__(**kwargs) 

1846 if cls.lockfile is None: 

1847 raise ValueError( 

1848 "{}.lockfile isn't set. Set it to a unique value " 

1849 "in the base class.".format(cls.__name__) 

1850 ) 

1851 

1852 @classmethod 

1853 def setUpClass(cls): 

1854 cls._lockfile = open(cls.lockfile) 

1855 cls.addClassCleanup(cls._lockfile.close) 

1856 locks.lock(cls._lockfile, locks.LOCK_EX) 

1857 super().setUpClass()