Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/test/client.py: 43%

463 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import json 

2import mimetypes 

3import os 

4import sys 

5from copy import copy 

6from functools import partial 

7from http import HTTPStatus 

8from importlib import import_module 

9from io import BytesIO 

10from urllib.parse import unquote_to_bytes, urljoin, urlparse, urlsplit 

11 

12from asgiref.sync import sync_to_async 

13 

14from django.conf import settings 

15from django.core.handlers.asgi import ASGIRequest 

16from django.core.handlers.base import BaseHandler 

17from django.core.handlers.wsgi import WSGIRequest 

18from django.core.serializers.json import DjangoJSONEncoder 

19from django.core.signals import got_request_exception, request_finished, request_started 

20from django.db import close_old_connections 

21from django.http import HttpRequest, QueryDict, SimpleCookie 

22from django.test import signals 

23from django.test.utils import ContextList 

24from django.urls import resolve 

25from django.utils.encoding import force_bytes 

26from django.utils.functional import SimpleLazyObject 

27from django.utils.http import urlencode 

28from django.utils.itercompat import is_iterable 

29from django.utils.regex_helper import _lazy_re_compile 

30 

31__all__ = ( 

32 "AsyncClient", 

33 "AsyncRequestFactory", 

34 "Client", 

35 "RedirectCycleError", 

36 "RequestFactory", 

37 "encode_file", 

38 "encode_multipart", 

39) 

40 

41 

42BOUNDARY = "BoUnDaRyStRiNg" 

43MULTIPART_CONTENT = "multipart/form-data; boundary=%s" % BOUNDARY 

44CONTENT_TYPE_RE = _lazy_re_compile(r".*; charset=([\w\d-]+);?") 

45# Structured suffix spec: https://tools.ietf.org/html/rfc6838#section-4.2.8 

46JSON_CONTENT_TYPE_RE = _lazy_re_compile(r"^application\/(.+\+)?json") 

47 

48 

49class RedirectCycleError(Exception): 

50 """The test client has been asked to follow a redirect loop.""" 

51 

52 def __init__(self, message, last_response): 

53 super().__init__(message) 

54 self.last_response = last_response 

55 self.redirect_chain = last_response.redirect_chain 

56 

57 

58class FakePayload: 

59 """ 

60 A wrapper around BytesIO that restricts what can be read since data from 

61 the network can't be sought and cannot be read outside of its content 

62 length. This makes sure that views can't do anything under the test client 

63 that wouldn't work in real life. 

64 """ 

65 

66 def __init__(self, content=None): 

67 self.__content = BytesIO() 

68 self.__len = 0 

69 self.read_started = False 

70 if content is not None: 70 ↛ exitline 70 didn't return from function '__init__', because the condition on line 70 was never false

71 self.write(content) 

72 

73 def __len__(self): 

74 return self.__len 

75 

76 def read(self, num_bytes=None): 

77 if not self.read_started: 77 ↛ 80line 77 didn't jump to line 80, because the condition on line 77 was never false

78 self.__content.seek(0) 

79 self.read_started = True 

80 if num_bytes is None: 80 ↛ 81line 80 didn't jump to line 81, because the condition on line 80 was never true

81 num_bytes = self.__len or 0 

82 assert ( 

83 self.__len >= num_bytes 

84 ), "Cannot read more than the available bytes from the HTTP incoming data." 

85 content = self.__content.read(num_bytes) 

86 self.__len -= num_bytes 

87 return content 

88 

89 def write(self, content): 

90 if self.read_started: 90 ↛ 91line 90 didn't jump to line 91, because the condition on line 90 was never true

91 raise ValueError("Unable to write a payload after it's been read") 

92 content = force_bytes(content) 

93 self.__content.write(content) 

94 self.__len += len(content) 

95 

96 

97def closing_iterator_wrapper(iterable, close): 

98 try: 

99 yield from iterable 

100 finally: 

101 request_finished.disconnect(close_old_connections) 

102 close() # will fire request_finished 

103 request_finished.connect(close_old_connections) 

104 

105 

106def conditional_content_removal(request, response): 

107 """ 

108 Simulate the behavior of most web servers by removing the content of 

109 responses for HEAD requests, 1xx, 204, and 304 responses. Ensure 

110 compliance with RFC 7230, section 3.3.3. 

111 """ 

112 if 100 <= response.status_code < 200 or response.status_code in (204, 304): 

113 if response.streaming: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true

114 response.streaming_content = [] 

115 else: 

116 response.content = b"" 

117 if request.method == "HEAD": 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true

118 if response.streaming: 

119 response.streaming_content = [] 

120 else: 

121 response.content = b"" 

122 return response 

123 

124 

125class ClientHandler(BaseHandler): 

126 """ 

127 An HTTP Handler that can be used for testing purposes. Use the WSGI 

128 interface to compose requests, but return the raw HttpResponse object with 

129 the originating WSGIRequest attached to its ``wsgi_request`` attribute. 

130 """ 

131 

132 def __init__(self, enforce_csrf_checks=True, *args, **kwargs): 

133 self.enforce_csrf_checks = enforce_csrf_checks 

134 super().__init__(*args, **kwargs) 

135 

136 def __call__(self, environ): 

137 # Set up middleware if needed. We couldn't do this earlier, because 

138 # settings weren't available. 

139 if self._middleware_chain is None: 

140 self.load_middleware() 

141 

142 request_started.disconnect(close_old_connections) 

143 request_started.send(sender=self.__class__, environ=environ) 

144 request_started.connect(close_old_connections) 

145 request = WSGIRequest(environ) 

146 # sneaky little hack so that we can easily get round 

147 # CsrfViewMiddleware. This makes life easier, and is probably 

148 # required for backwards compatibility with external tests against 

149 # admin views. 

150 request._dont_enforce_csrf_checks = not self.enforce_csrf_checks 

151 

152 # Request goes through middleware. 

153 response = self.get_response(request) 

154 

155 # Simulate behaviors of most web servers. 

156 conditional_content_removal(request, response) 

157 

158 # Attach the originating request to the response so that it could be 

159 # later retrieved. 

160 response.wsgi_request = request 

161 

162 # Emulate a WSGI server by calling the close method on completion. 

163 if response.streaming: 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true

164 response.streaming_content = closing_iterator_wrapper( 

165 response.streaming_content, response.close 

166 ) 

167 else: 

168 request_finished.disconnect(close_old_connections) 

169 response.close() # will fire request_finished 

170 request_finished.connect(close_old_connections) 

171 

172 return response 

173 

174 

175class AsyncClientHandler(BaseHandler): 

176 """An async version of ClientHandler.""" 

177 

178 def __init__(self, enforce_csrf_checks=True, *args, **kwargs): 

179 self.enforce_csrf_checks = enforce_csrf_checks 

180 super().__init__(*args, **kwargs) 

181 

182 async def __call__(self, scope): 

183 # Set up middleware if needed. We couldn't do this earlier, because 

184 # settings weren't available. 

185 if self._middleware_chain is None: 

186 self.load_middleware(is_async=True) 

187 # Extract body file from the scope, if provided. 

188 if "_body_file" in scope: 

189 body_file = scope.pop("_body_file") 

190 else: 

191 body_file = FakePayload("") 

192 

193 request_started.disconnect(close_old_connections) 

194 await sync_to_async(request_started.send, thread_sensitive=False)( 

195 sender=self.__class__, scope=scope 

196 ) 

197 request_started.connect(close_old_connections) 

198 request = ASGIRequest(scope, body_file) 

199 # Sneaky little hack so that we can easily get round 

200 # CsrfViewMiddleware. This makes life easier, and is probably required 

201 # for backwards compatibility with external tests against admin views. 

202 request._dont_enforce_csrf_checks = not self.enforce_csrf_checks 

203 # Request goes through middleware. 

204 response = await self.get_response_async(request) 

205 # Simulate behaviors of most web servers. 

206 conditional_content_removal(request, response) 

207 # Attach the originating ASGI request to the response so that it could 

208 # be later retrieved. 

209 response.asgi_request = request 

210 # Emulate a server by calling the close method on completion. 

211 if response.streaming: 

212 response.streaming_content = await sync_to_async( 

213 closing_iterator_wrapper, thread_sensitive=False 

214 )( 

215 response.streaming_content, 

216 response.close, 

217 ) 

218 else: 

219 request_finished.disconnect(close_old_connections) 

220 # Will fire request_finished. 

221 await sync_to_async(response.close, thread_sensitive=False)() 

222 request_finished.connect(close_old_connections) 

223 return response 

224 

225 

226def store_rendered_templates(store, signal, sender, template, context, **kwargs): 

227 """ 

228 Store templates and contexts that are rendered. 

229 

230 The context is copied so that it is an accurate representation at the time 

231 of rendering. 

232 """ 

233 store.setdefault("templates", []).append(template) 

234 if "context" not in store: 

235 store["context"] = ContextList() 

236 store["context"].append(copy(context)) 

237 

238 

239def encode_multipart(boundary, data): 

240 """ 

241 Encode multipart POST data from a dictionary of form values. 

242 

243 The key will be used as the form data name; the value will be transmitted 

244 as content. If the value is a file, the contents of the file will be sent 

245 as an application/octet-stream; otherwise, str(value) will be sent. 

246 """ 

247 lines = [] 

248 

249 def to_bytes(s): 

250 return force_bytes(s, settings.DEFAULT_CHARSET) 

251 

252 # Not by any means perfect, but good enough for our purposes. 

253 def is_file(thing): 

254 return hasattr(thing, "read") and callable(thing.read) 

255 

256 # Each bit of the multipart form data could be either a form value or a 

257 # file, or a *list* of form values and/or files. Remember that HTTP field 

258 # names can be duplicated! 

259 for (key, value) in data.items(): 

260 if value is None: 260 ↛ 261line 260 didn't jump to line 261, because the condition on line 260 was never true

261 raise TypeError( 

262 "Cannot encode None for key '%s' as POST data. Did you mean " 

263 "to pass an empty string or omit the value?" % key 

264 ) 

265 elif is_file(value): 

266 lines.extend(encode_file(boundary, key, value)) 

267 elif not isinstance(value, str) and is_iterable(value): 267 ↛ 268line 267 didn't jump to line 268, because the condition on line 267 was never true

268 for item in value: 

269 if is_file(item): 

270 lines.extend(encode_file(boundary, key, item)) 

271 else: 

272 lines.extend( 

273 to_bytes(val) 

274 for val in [ 

275 "--%s" % boundary, 

276 'Content-Disposition: form-data; name="%s"' % key, 

277 "", 

278 item, 

279 ] 

280 ) 

281 else: 

282 lines.extend( 

283 to_bytes(val) 

284 for val in [ 

285 "--%s" % boundary, 

286 'Content-Disposition: form-data; name="%s"' % key, 

287 "", 

288 value, 

289 ] 

290 ) 

291 

292 lines.extend( 

293 [ 

294 to_bytes("--%s--" % boundary), 

295 b"", 

296 ] 

297 ) 

298 return b"\r\n".join(lines) 

299 

300 

301def encode_file(boundary, key, file): 

302 def to_bytes(s): 

303 return force_bytes(s, settings.DEFAULT_CHARSET) 

304 

305 # file.name might not be a string. For example, it's an int for 

306 # tempfile.TemporaryFile(). 

307 file_has_string_name = hasattr(file, "name") and isinstance(file.name, str) 

308 filename = os.path.basename(file.name) if file_has_string_name else "" 

309 

310 if hasattr(file, "content_type"): 310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true

311 content_type = file.content_type 

312 elif filename: 312 ↛ 315line 312 didn't jump to line 315, because the condition on line 312 was never false

313 content_type = mimetypes.guess_type(filename)[0] 

314 else: 

315 content_type = None 

316 

317 if content_type is None: 317 ↛ 318line 317 didn't jump to line 318, because the condition on line 317 was never true

318 content_type = "application/octet-stream" 

319 filename = filename or key 

320 return [ 

321 to_bytes("--%s" % boundary), 

322 to_bytes( 

323 'Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename) 

324 ), 

325 to_bytes("Content-Type: %s" % content_type), 

326 b"", 

327 to_bytes(file.read()), 

328 ] 

329 

330 

331class RequestFactory: 

332 """ 

333 Class that lets you create mock Request objects for use in testing. 

334 

335 Usage: 

336 

337 rf = RequestFactory() 

338 get_request = rf.get('/hello/') 

339 post_request = rf.post('/submit/', {'foo': 'bar'}) 

340 

341 Once you have a request object you can pass it to any view function, 

342 just as if that view had been hooked up using a URLconf. 

343 """ 

344 

345 def __init__(self, *, json_encoder=DjangoJSONEncoder, **defaults): 

346 self.json_encoder = json_encoder 

347 self.defaults = defaults 

348 self.cookies = SimpleCookie() 

349 self.errors = BytesIO() 

350 

351 def _base_environ(self, **request): 

352 """ 

353 The base environment for a request. 

354 """ 

355 # This is a minimal valid WSGI environ dictionary, plus: 

356 # - HTTP_COOKIE: for cookie support, 

357 # - REMOTE_ADDR: often useful, see #8551. 

358 # See https://www.python.org/dev/peps/pep-3333/#environ-variables 

359 return { 

360 "HTTP_COOKIE": "; ".join( 

361 sorted( 

362 "%s=%s" % (morsel.key, morsel.coded_value) 

363 for morsel in self.cookies.values() 

364 ) 

365 ), 

366 "PATH_INFO": "/", 

367 "REMOTE_ADDR": "127.0.0.1", 

368 "REQUEST_METHOD": "GET", 

369 "SCRIPT_NAME": "", 

370 "SERVER_NAME": "testserver", 

371 "SERVER_PORT": "80", 

372 "SERVER_PROTOCOL": "HTTP/1.1", 

373 "wsgi.version": (1, 0), 

374 "wsgi.url_scheme": "http", 

375 "wsgi.input": FakePayload(b""), 

376 "wsgi.errors": self.errors, 

377 "wsgi.multiprocess": True, 

378 "wsgi.multithread": False, 

379 "wsgi.run_once": False, 

380 **self.defaults, 

381 **request, 

382 } 

383 

384 def request(self, **request): 

385 "Construct a generic request object." 

386 return WSGIRequest(self._base_environ(**request)) 

387 

388 def _encode_data(self, data, content_type): 

389 if content_type is MULTIPART_CONTENT: 

390 return encode_multipart(BOUNDARY, data) 

391 else: 

392 # Encode the content so that the byte representation is correct. 

393 match = CONTENT_TYPE_RE.match(content_type) 

394 if match: 

395 charset = match[1] 

396 else: 

397 charset = settings.DEFAULT_CHARSET 

398 return force_bytes(data, encoding=charset) 

399 

400 def _encode_json(self, data, content_type): 

401 """ 

402 Return encoded JSON if data is a dict, list, or tuple and content_type 

403 is application/json. 

404 """ 

405 should_encode = JSON_CONTENT_TYPE_RE.match(content_type) and isinstance( 

406 data, (dict, list, tuple) 

407 ) 

408 return json.dumps(data, cls=self.json_encoder) if should_encode else data 

409 

410 def _get_path(self, parsed): 

411 path = parsed.path 

412 # If there are parameters, add them 

413 if parsed.params: 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true

414 path += ";" + parsed.params 

415 path = unquote_to_bytes(path) 

416 # Replace the behavior where non-ASCII values in the WSGI environ are 

417 # arbitrarily decoded with ISO-8859-1. 

418 # Refs comment in `get_bytes_from_wsgi()`. 

419 return path.decode("iso-8859-1") 

420 

421 def get(self, path, data=None, secure=False, **extra): 

422 """Construct a GET request.""" 

423 data = {} if data is None else data 

424 return self.generic( 

425 "GET", 

426 path, 

427 secure=secure, 

428 **{ 

429 "QUERY_STRING": urlencode(data, doseq=True), 

430 **extra, 

431 }, 

432 ) 

433 

434 def post( 

435 self, path, data=None, content_type=MULTIPART_CONTENT, secure=False, **extra 

436 ): 

437 """Construct a POST request.""" 

438 data = self._encode_json({} if data is None else data, content_type) 

439 post_data = self._encode_data(data, content_type) 

440 

441 return self.generic( 

442 "POST", path, post_data, content_type, secure=secure, **extra 

443 ) 

444 

445 def head(self, path, data=None, secure=False, **extra): 

446 """Construct a HEAD request.""" 

447 data = {} if data is None else data 

448 return self.generic( 

449 "HEAD", 

450 path, 

451 secure=secure, 

452 **{ 

453 "QUERY_STRING": urlencode(data, doseq=True), 

454 **extra, 

455 }, 

456 ) 

457 

458 def trace(self, path, secure=False, **extra): 

459 """Construct a TRACE request.""" 

460 return self.generic("TRACE", path, secure=secure, **extra) 

461 

462 def options( 

463 self, 

464 path, 

465 data="", 

466 content_type="application/octet-stream", 

467 secure=False, 

468 **extra, 

469 ): 

470 "Construct an OPTIONS request." 

471 return self.generic("OPTIONS", path, data, content_type, secure=secure, **extra) 

472 

473 def put( 

474 self, 

475 path, 

476 data="", 

477 content_type="application/octet-stream", 

478 secure=False, 

479 **extra, 

480 ): 

481 """Construct a PUT request.""" 

482 data = self._encode_json(data, content_type) 

483 return self.generic("PUT", path, data, content_type, secure=secure, **extra) 

484 

485 def patch( 

486 self, 

487 path, 

488 data="", 

489 content_type="application/octet-stream", 

490 secure=False, 

491 **extra, 

492 ): 

493 """Construct a PATCH request.""" 

494 data = self._encode_json(data, content_type) 

495 return self.generic("PATCH", path, data, content_type, secure=secure, **extra) 

496 

497 def delete( 

498 self, 

499 path, 

500 data="", 

501 content_type="application/octet-stream", 

502 secure=False, 

503 **extra, 

504 ): 

505 """Construct a DELETE request.""" 

506 data = self._encode_json(data, content_type) 

507 return self.generic("DELETE", path, data, content_type, secure=secure, **extra) 

508 

509 def generic( 

510 self, 

511 method, 

512 path, 

513 data="", 

514 content_type="application/octet-stream", 

515 secure=False, 

516 **extra, 

517 ): 

518 """Construct an arbitrary HTTP request.""" 

519 parsed = urlparse(str(path)) # path can be lazy 

520 data = force_bytes(data, settings.DEFAULT_CHARSET) 

521 r = { 

522 "PATH_INFO": self._get_path(parsed), 

523 "REQUEST_METHOD": method, 

524 "SERVER_PORT": "443" if secure else "80", 

525 "wsgi.url_scheme": "https" if secure else "http", 

526 } 

527 if data: 

528 r.update( 

529 { 

530 "CONTENT_LENGTH": str(len(data)), 

531 "CONTENT_TYPE": content_type, 

532 "wsgi.input": FakePayload(data), 

533 } 

534 ) 

535 r.update(extra) 

536 # If QUERY_STRING is absent or empty, we want to extract it from the URL. 

537 if not r.get("QUERY_STRING"): 

538 # WSGI requires latin-1 encoded strings. See get_path_info(). 

539 query_string = parsed[4].encode().decode("iso-8859-1") 

540 r["QUERY_STRING"] = query_string 

541 return self.request(**r) 

542 

543 

544class AsyncRequestFactory(RequestFactory): 

545 """ 

546 Class that lets you create mock ASGI-like Request objects for use in 

547 testing. Usage: 

548 

549 rf = AsyncRequestFactory() 

550 get_request = await rf.get('/hello/') 

551 post_request = await rf.post('/submit/', {'foo': 'bar'}) 

552 

553 Once you have a request object you can pass it to any view function, 

554 including synchronous ones. The reason we have a separate class here is: 

555 a) this makes ASGIRequest subclasses, and 

556 b) AsyncTestClient can subclass it. 

557 """ 

558 

559 def _base_scope(self, **request): 

560 """The base scope for a request.""" 

561 # This is a minimal valid ASGI scope, plus: 

562 # - headers['cookie'] for cookie support, 

563 # - 'client' often useful, see #8551. 

564 scope = { 

565 "asgi": {"version": "3.0"}, 

566 "type": "http", 

567 "http_version": "1.1", 

568 "client": ["127.0.0.1", 0], 

569 "server": ("testserver", "80"), 

570 "scheme": "http", 

571 "method": "GET", 

572 "headers": [], 

573 **self.defaults, 

574 **request, 

575 } 

576 scope["headers"].append( 

577 ( 

578 b"cookie", 

579 b"; ".join( 

580 sorted( 

581 ("%s=%s" % (morsel.key, morsel.coded_value)).encode("ascii") 

582 for morsel in self.cookies.values() 

583 ) 

584 ), 

585 ) 

586 ) 

587 return scope 

588 

589 def request(self, **request): 

590 """Construct a generic request object.""" 

591 # This is synchronous, which means all methods on this class are. 

592 # AsyncClient, however, has an async request function, which makes all 

593 # its methods async. 

594 if "_body_file" in request: 

595 body_file = request.pop("_body_file") 

596 else: 

597 body_file = FakePayload("") 

598 return ASGIRequest(self._base_scope(**request), body_file) 

599 

600 def generic( 

601 self, 

602 method, 

603 path, 

604 data="", 

605 content_type="application/octet-stream", 

606 secure=False, 

607 **extra, 

608 ): 

609 """Construct an arbitrary HTTP request.""" 

610 parsed = urlparse(str(path)) # path can be lazy. 

611 data = force_bytes(data, settings.DEFAULT_CHARSET) 

612 s = { 

613 "method": method, 

614 "path": self._get_path(parsed), 

615 "server": ("127.0.0.1", "443" if secure else "80"), 

616 "scheme": "https" if secure else "http", 

617 "headers": [(b"host", b"testserver")], 

618 } 

619 if data: 

620 s["headers"].extend( 

621 [ 

622 (b"content-length", str(len(data)).encode("ascii")), 

623 (b"content-type", content_type.encode("ascii")), 

624 ] 

625 ) 

626 s["_body_file"] = FakePayload(data) 

627 follow = extra.pop("follow", None) 

628 if follow is not None: 

629 s["follow"] = follow 

630 if query_string := extra.pop("QUERY_STRING", None): 

631 s["query_string"] = query_string 

632 s["headers"] += [ 

633 (key.lower().encode("ascii"), value.encode("latin1")) 

634 for key, value in extra.items() 

635 ] 

636 # If QUERY_STRING is absent or empty, we want to extract it from the 

637 # URL. 

638 if not s.get("query_string"): 

639 s["query_string"] = parsed[4] 

640 return self.request(**s) 

641 

642 

643class ClientMixin: 

644 """ 

645 Mixin with common methods between Client and AsyncClient. 

646 """ 

647 

648 def store_exc_info(self, **kwargs): 

649 """Store exceptions when they are generated by a view.""" 

650 self.exc_info = sys.exc_info() 

651 

652 def check_exception(self, response): 

653 """ 

654 Look for a signaled exception, clear the current context exception 

655 data, re-raise the signaled exception, and clear the signaled exception 

656 from the local cache. 

657 """ 

658 response.exc_info = self.exc_info 

659 if self.exc_info: 659 ↛ 660line 659 didn't jump to line 660, because the condition on line 659 was never true

660 _, exc_value, _ = self.exc_info 

661 self.exc_info = None 

662 if self.raise_request_exception: 

663 raise exc_value 

664 

665 @property 

666 def session(self): 

667 """Return the current session variables.""" 

668 engine = import_module(settings.SESSION_ENGINE) 

669 cookie = self.cookies.get(settings.SESSION_COOKIE_NAME) 

670 if cookie: 

671 return engine.SessionStore(cookie.value) 

672 session = engine.SessionStore() 

673 session.save() 

674 self.cookies[settings.SESSION_COOKIE_NAME] = session.session_key 

675 return session 

676 

677 def login(self, **credentials): 

678 """ 

679 Set the Factory to appear as if it has successfully logged into a site. 

680 

681 Return True if login is possible or False if the provided credentials 

682 are incorrect. 

683 """ 

684 from django.contrib.auth import authenticate 

685 

686 user = authenticate(**credentials) 

687 if user: 

688 self._login(user) 

689 return True 

690 return False 

691 

692 def force_login(self, user, backend=None): 

693 def get_backend(): 

694 from django.contrib.auth import load_backend 

695 

696 for backend_path in settings.AUTHENTICATION_BACKENDS: 

697 backend = load_backend(backend_path) 

698 if hasattr(backend, "get_user"): 

699 return backend_path 

700 

701 if backend is None: 

702 backend = get_backend() 

703 user.backend = backend 

704 self._login(user, backend) 

705 

706 def _login(self, user, backend=None): 

707 from django.contrib.auth import login 

708 

709 # Create a fake request to store login details. 

710 request = HttpRequest() 

711 if self.session: 

712 request.session = self.session 

713 else: 

714 engine = import_module(settings.SESSION_ENGINE) 

715 request.session = engine.SessionStore() 

716 login(request, user, backend) 

717 # Save the session values. 

718 request.session.save() 

719 # Set the cookie to represent the session. 

720 session_cookie = settings.SESSION_COOKIE_NAME 

721 self.cookies[session_cookie] = request.session.session_key 

722 cookie_data = { 

723 "max-age": None, 

724 "path": "/", 

725 "domain": settings.SESSION_COOKIE_DOMAIN, 

726 "secure": settings.SESSION_COOKIE_SECURE or None, 

727 "expires": None, 

728 } 

729 self.cookies[session_cookie].update(cookie_data) 

730 

731 def logout(self): 

732 """Log out the user by removing the cookies and session object.""" 

733 from django.contrib.auth import get_user, logout 

734 

735 request = HttpRequest() 

736 if self.session: 

737 request.session = self.session 

738 request.user = get_user(request) 

739 else: 

740 engine = import_module(settings.SESSION_ENGINE) 

741 request.session = engine.SessionStore() 

742 logout(request) 

743 self.cookies = SimpleCookie() 

744 

745 def _parse_json(self, response, **extra): 

746 if not hasattr(response, "_json"): 746 ↛ 755line 746 didn't jump to line 755, because the condition on line 746 was never false

747 if not JSON_CONTENT_TYPE_RE.match(response.get("Content-Type")): 747 ↛ 748line 747 didn't jump to line 748, because the condition on line 747 was never true

748 raise ValueError( 

749 'Content-Type header is "%s", not "application/json"' 

750 % response.get("Content-Type") 

751 ) 

752 response._json = json.loads( 

753 response.content.decode(response.charset), **extra 

754 ) 

755 return response._json 

756 

757 

758class Client(ClientMixin, RequestFactory): 

759 """ 

760 A class that can act as a client for testing purposes. 

761 

762 It allows the user to compose GET and POST requests, and 

763 obtain the response that the server gave to those requests. 

764 The server Response objects are annotated with the details 

765 of the contexts and templates that were rendered during the 

766 process of serving the request. 

767 

768 Client objects are stateful - they will retain cookie (and 

769 thus session) details for the lifetime of the Client instance. 

770 

771 This is not intended as a replacement for Twill/Selenium or 

772 the like - it is here to allow testing against the 

773 contexts and templates produced by a view, rather than the 

774 HTML rendered to the end-user. 

775 """ 

776 

777 def __init__( 

778 self, enforce_csrf_checks=False, raise_request_exception=True, **defaults 

779 ): 

780 super().__init__(**defaults) 

781 self.handler = ClientHandler(enforce_csrf_checks) 

782 self.raise_request_exception = raise_request_exception 

783 self.exc_info = None 

784 self.extra = None 

785 

786 def request(self, **request): 

787 """ 

788 The master request method. Compose the environment dictionary and pass 

789 to the handler, return the result of the handler. Assume defaults for 

790 the query environment, which can be overridden using the arguments to 

791 the request. 

792 """ 

793 environ = self._base_environ(**request) 

794 

795 # Curry a data dictionary into an instance of the template renderer 

796 # callback function. 

797 data = {} 

798 on_template_render = partial(store_rendered_templates, data) 

799 signal_uid = "template-render-%s" % id(request) 

800 signals.template_rendered.connect(on_template_render, dispatch_uid=signal_uid) 

801 # Capture exceptions created by the handler. 

802 exception_uid = "request-exception-%s" % id(request) 

803 got_request_exception.connect(self.store_exc_info, dispatch_uid=exception_uid) 

804 try: 

805 response = self.handler(environ) 

806 finally: 

807 signals.template_rendered.disconnect(dispatch_uid=signal_uid) 

808 got_request_exception.disconnect(dispatch_uid=exception_uid) 

809 # Check for signaled exceptions. 

810 self.check_exception(response) 

811 # Save the client and request that stimulated the response. 

812 response.client = self 

813 response.request = request 

814 # Add any rendered template detail to the response. 

815 response.templates = data.get("templates", []) 

816 response.context = data.get("context") 

817 response.json = partial(self._parse_json, response) 

818 # Attach the ResolverMatch instance to the response. 

819 urlconf = getattr(response.wsgi_request, "urlconf", None) 

820 response.resolver_match = SimpleLazyObject( 820 ↛ exitline 820 didn't jump to the function exit

821 lambda: resolve(request["PATH_INFO"], urlconf=urlconf), 

822 ) 

823 # Flatten a single context. Not really necessary anymore thanks to the 

824 # __getattr__ flattening in ContextList, but has some edge case 

825 # backwards compatibility implications. 

826 if response.context and len(response.context) == 1: 826 ↛ 827line 826 didn't jump to line 827, because the condition on line 826 was never true

827 response.context = response.context[0] 

828 # Update persistent cookie data. 

829 if response.cookies: 829 ↛ 830line 829 didn't jump to line 830, because the condition on line 829 was never true

830 self.cookies.update(response.cookies) 

831 return response 

832 

833 def get(self, path, data=None, follow=False, secure=False, **extra): 

834 """Request a response from the server using GET.""" 

835 self.extra = extra 

836 response = super().get(path, data=data, secure=secure, **extra) 

837 if follow: 

838 response = self._handle_redirects(response, data=data, **extra) 

839 return response 

840 

841 def post( 

842 self, 

843 path, 

844 data=None, 

845 content_type=MULTIPART_CONTENT, 

846 follow=False, 

847 secure=False, 

848 **extra, 

849 ): 

850 """Request a response from the server using POST.""" 

851 self.extra = extra 

852 response = super().post( 

853 path, data=data, content_type=content_type, secure=secure, **extra 

854 ) 

855 if follow: 

856 response = self._handle_redirects( 

857 response, data=data, content_type=content_type, **extra 

858 ) 

859 return response 

860 

861 def head(self, path, data=None, follow=False, secure=False, **extra): 

862 """Request a response from the server using HEAD.""" 

863 self.extra = extra 

864 response = super().head(path, data=data, secure=secure, **extra) 

865 if follow: 

866 response = self._handle_redirects(response, data=data, **extra) 

867 return response 

868 

869 def options( 

870 self, 

871 path, 

872 data="", 

873 content_type="application/octet-stream", 

874 follow=False, 

875 secure=False, 

876 **extra, 

877 ): 

878 """Request a response from the server using OPTIONS.""" 

879 self.extra = extra 

880 response = super().options( 

881 path, data=data, content_type=content_type, secure=secure, **extra 

882 ) 

883 if follow: 

884 response = self._handle_redirects( 

885 response, data=data, content_type=content_type, **extra 

886 ) 

887 return response 

888 

889 def put( 

890 self, 

891 path, 

892 data="", 

893 content_type="application/octet-stream", 

894 follow=False, 

895 secure=False, 

896 **extra, 

897 ): 

898 """Send a resource to the server using PUT.""" 

899 self.extra = extra 

900 response = super().put( 

901 path, data=data, content_type=content_type, secure=secure, **extra 

902 ) 

903 if follow: 

904 response = self._handle_redirects( 

905 response, data=data, content_type=content_type, **extra 

906 ) 

907 return response 

908 

909 def patch( 

910 self, 

911 path, 

912 data="", 

913 content_type="application/octet-stream", 

914 follow=False, 

915 secure=False, 

916 **extra, 

917 ): 

918 """Send a resource to the server using PATCH.""" 

919 self.extra = extra 

920 response = super().patch( 

921 path, data=data, content_type=content_type, secure=secure, **extra 

922 ) 

923 if follow: 

924 response = self._handle_redirects( 

925 response, data=data, content_type=content_type, **extra 

926 ) 

927 return response 

928 

929 def delete( 

930 self, 

931 path, 

932 data="", 

933 content_type="application/octet-stream", 

934 follow=False, 

935 secure=False, 

936 **extra, 

937 ): 

938 """Send a DELETE request to the server.""" 

939 self.extra = extra 

940 response = super().delete( 

941 path, data=data, content_type=content_type, secure=secure, **extra 

942 ) 

943 if follow: 

944 response = self._handle_redirects( 

945 response, data=data, content_type=content_type, **extra 

946 ) 

947 return response 

948 

949 def trace(self, path, data="", follow=False, secure=False, **extra): 

950 """Send a TRACE request to the server.""" 

951 self.extra = extra 

952 response = super().trace(path, data=data, secure=secure, **extra) 

953 if follow: 

954 response = self._handle_redirects(response, data=data, **extra) 

955 return response 

956 

957 def _handle_redirects(self, response, data="", content_type="", **extra): 

958 """ 

959 Follow any redirects by requesting responses from the server using GET. 

960 """ 

961 response.redirect_chain = [] 

962 redirect_status_codes = ( 

963 HTTPStatus.MOVED_PERMANENTLY, 

964 HTTPStatus.FOUND, 

965 HTTPStatus.SEE_OTHER, 

966 HTTPStatus.TEMPORARY_REDIRECT, 

967 HTTPStatus.PERMANENT_REDIRECT, 

968 ) 

969 while response.status_code in redirect_status_codes: 

970 response_url = response.url 

971 redirect_chain = response.redirect_chain 

972 redirect_chain.append((response_url, response.status_code)) 

973 

974 url = urlsplit(response_url) 

975 if url.scheme: 

976 extra["wsgi.url_scheme"] = url.scheme 

977 if url.hostname: 

978 extra["SERVER_NAME"] = url.hostname 

979 if url.port: 

980 extra["SERVER_PORT"] = str(url.port) 

981 

982 path = url.path 

983 # RFC 2616: bare domains without path are treated as the root. 

984 if not path and url.netloc: 

985 path = "/" 

986 # Prepend the request path to handle relative path redirects 

987 if not path.startswith("/"): 

988 path = urljoin(response.request["PATH_INFO"], path) 

989 

990 if response.status_code in ( 

991 HTTPStatus.TEMPORARY_REDIRECT, 

992 HTTPStatus.PERMANENT_REDIRECT, 

993 ): 

994 # Preserve request method and query string (if needed) 

995 # post-redirect for 307/308 responses. 

996 request_method = response.request["REQUEST_METHOD"].lower() 

997 if request_method not in ("get", "head"): 

998 extra["QUERY_STRING"] = url.query 

999 request_method = getattr(self, request_method) 

1000 else: 

1001 request_method = self.get 

1002 data = QueryDict(url.query) 

1003 content_type = None 

1004 

1005 response = request_method( 

1006 path, data=data, content_type=content_type, follow=False, **extra 

1007 ) 

1008 response.redirect_chain = redirect_chain 

1009 

1010 if redirect_chain[-1] in redirect_chain[:-1]: 

1011 # Check that we're not redirecting to somewhere we've already 

1012 # been to, to prevent loops. 

1013 raise RedirectCycleError( 

1014 "Redirect loop detected.", last_response=response 

1015 ) 

1016 if len(redirect_chain) > 20: 

1017 # Such a lengthy chain likely also means a loop, but one with 

1018 # a growing path, changing view, or changing query argument; 

1019 # 20 is the value of "network.http.redirection-limit" from Firefox. 

1020 raise RedirectCycleError("Too many redirects.", last_response=response) 

1021 

1022 return response 

1023 

1024 

1025class AsyncClient(ClientMixin, AsyncRequestFactory): 

1026 """ 

1027 An async version of Client that creates ASGIRequests and calls through an 

1028 async request path. 

1029 

1030 Does not currently support "follow" on its methods. 

1031 """ 

1032 

1033 def __init__( 

1034 self, enforce_csrf_checks=False, raise_request_exception=True, **defaults 

1035 ): 

1036 super().__init__(**defaults) 

1037 self.handler = AsyncClientHandler(enforce_csrf_checks) 

1038 self.raise_request_exception = raise_request_exception 

1039 self.exc_info = None 

1040 self.extra = None 

1041 

1042 async def request(self, **request): 

1043 """ 

1044 The master request method. Compose the scope dictionary and pass to the 

1045 handler, return the result of the handler. Assume defaults for the 

1046 query environment, which can be overridden using the arguments to the 

1047 request. 

1048 """ 

1049 if "follow" in request: 

1050 raise NotImplementedError( 

1051 "AsyncClient request methods do not accept the follow parameter." 

1052 ) 

1053 scope = self._base_scope(**request) 

1054 # Curry a data dictionary into an instance of the template renderer 

1055 # callback function. 

1056 data = {} 

1057 on_template_render = partial(store_rendered_templates, data) 

1058 signal_uid = "template-render-%s" % id(request) 

1059 signals.template_rendered.connect(on_template_render, dispatch_uid=signal_uid) 

1060 # Capture exceptions created by the handler. 

1061 exception_uid = "request-exception-%s" % id(request) 

1062 got_request_exception.connect(self.store_exc_info, dispatch_uid=exception_uid) 

1063 try: 

1064 response = await self.handler(scope) 

1065 finally: 

1066 signals.template_rendered.disconnect(dispatch_uid=signal_uid) 

1067 got_request_exception.disconnect(dispatch_uid=exception_uid) 

1068 # Check for signaled exceptions. 

1069 self.check_exception(response) 

1070 # Save the client and request that stimulated the response. 

1071 response.client = self 

1072 response.request = request 

1073 # Add any rendered template detail to the response. 

1074 response.templates = data.get("templates", []) 

1075 response.context = data.get("context") 

1076 response.json = partial(self._parse_json, response) 

1077 # Attach the ResolverMatch instance to the response. 

1078 urlconf = getattr(response.asgi_request, "urlconf", None) 

1079 response.resolver_match = SimpleLazyObject( 

1080 lambda: resolve(request["path"], urlconf=urlconf), 

1081 ) 

1082 # Flatten a single context. Not really necessary anymore thanks to the 

1083 # __getattr__ flattening in ContextList, but has some edge case 

1084 # backwards compatibility implications. 

1085 if response.context and len(response.context) == 1: 

1086 response.context = response.context[0] 

1087 # Update persistent cookie data. 

1088 if response.cookies: 

1089 self.cookies.update(response.cookies) 

1090 return response