Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/test/client.py: 43%
463 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import json
2import mimetypes
3import os
4import sys
5from copy import copy
6from functools import partial
7from http import HTTPStatus
8from importlib import import_module
9from io import BytesIO
10from urllib.parse import unquote_to_bytes, urljoin, urlparse, urlsplit
12from asgiref.sync import sync_to_async
14from django.conf import settings
15from django.core.handlers.asgi import ASGIRequest
16from django.core.handlers.base import BaseHandler
17from django.core.handlers.wsgi import WSGIRequest
18from django.core.serializers.json import DjangoJSONEncoder
19from django.core.signals import got_request_exception, request_finished, request_started
20from django.db import close_old_connections
21from django.http import HttpRequest, QueryDict, SimpleCookie
22from django.test import signals
23from django.test.utils import ContextList
24from django.urls import resolve
25from django.utils.encoding import force_bytes
26from django.utils.functional import SimpleLazyObject
27from django.utils.http import urlencode
28from django.utils.itercompat import is_iterable
29from django.utils.regex_helper import _lazy_re_compile
31__all__ = (
32 "AsyncClient",
33 "AsyncRequestFactory",
34 "Client",
35 "RedirectCycleError",
36 "RequestFactory",
37 "encode_file",
38 "encode_multipart",
39)
42BOUNDARY = "BoUnDaRyStRiNg"
43MULTIPART_CONTENT = "multipart/form-data; boundary=%s" % BOUNDARY
44CONTENT_TYPE_RE = _lazy_re_compile(r".*; charset=([\w\d-]+);?")
45# Structured suffix spec: https://tools.ietf.org/html/rfc6838#section-4.2.8
46JSON_CONTENT_TYPE_RE = _lazy_re_compile(r"^application\/(.+\+)?json")
49class RedirectCycleError(Exception):
50 """The test client has been asked to follow a redirect loop."""
52 def __init__(self, message, last_response):
53 super().__init__(message)
54 self.last_response = last_response
55 self.redirect_chain = last_response.redirect_chain
58class FakePayload:
59 """
60 A wrapper around BytesIO that restricts what can be read since data from
61 the network can't be sought and cannot be read outside of its content
62 length. This makes sure that views can't do anything under the test client
63 that wouldn't work in real life.
64 """
66 def __init__(self, content=None):
67 self.__content = BytesIO()
68 self.__len = 0
69 self.read_started = False
70 if content is not None: 70 ↛ exitline 70 didn't return from function '__init__', because the condition on line 70 was never false
71 self.write(content)
73 def __len__(self):
74 return self.__len
76 def read(self, num_bytes=None):
77 if not self.read_started: 77 ↛ 80line 77 didn't jump to line 80, because the condition on line 77 was never false
78 self.__content.seek(0)
79 self.read_started = True
80 if num_bytes is None: 80 ↛ 81line 80 didn't jump to line 81, because the condition on line 80 was never true
81 num_bytes = self.__len or 0
82 assert (
83 self.__len >= num_bytes
84 ), "Cannot read more than the available bytes from the HTTP incoming data."
85 content = self.__content.read(num_bytes)
86 self.__len -= num_bytes
87 return content
89 def write(self, content):
90 if self.read_started: 90 ↛ 91line 90 didn't jump to line 91, because the condition on line 90 was never true
91 raise ValueError("Unable to write a payload after it's been read")
92 content = force_bytes(content)
93 self.__content.write(content)
94 self.__len += len(content)
97def closing_iterator_wrapper(iterable, close):
98 try:
99 yield from iterable
100 finally:
101 request_finished.disconnect(close_old_connections)
102 close() # will fire request_finished
103 request_finished.connect(close_old_connections)
106def conditional_content_removal(request, response):
107 """
108 Simulate the behavior of most web servers by removing the content of
109 responses for HEAD requests, 1xx, 204, and 304 responses. Ensure
110 compliance with RFC 7230, section 3.3.3.
111 """
112 if 100 <= response.status_code < 200 or response.status_code in (204, 304):
113 if response.streaming: 113 ↛ 114line 113 didn't jump to line 114, because the condition on line 113 was never true
114 response.streaming_content = []
115 else:
116 response.content = b""
117 if request.method == "HEAD": 117 ↛ 118line 117 didn't jump to line 118, because the condition on line 117 was never true
118 if response.streaming:
119 response.streaming_content = []
120 else:
121 response.content = b""
122 return response
125class ClientHandler(BaseHandler):
126 """
127 An HTTP Handler that can be used for testing purposes. Use the WSGI
128 interface to compose requests, but return the raw HttpResponse object with
129 the originating WSGIRequest attached to its ``wsgi_request`` attribute.
130 """
132 def __init__(self, enforce_csrf_checks=True, *args, **kwargs):
133 self.enforce_csrf_checks = enforce_csrf_checks
134 super().__init__(*args, **kwargs)
136 def __call__(self, environ):
137 # Set up middleware if needed. We couldn't do this earlier, because
138 # settings weren't available.
139 if self._middleware_chain is None:
140 self.load_middleware()
142 request_started.disconnect(close_old_connections)
143 request_started.send(sender=self.__class__, environ=environ)
144 request_started.connect(close_old_connections)
145 request = WSGIRequest(environ)
146 # sneaky little hack so that we can easily get round
147 # CsrfViewMiddleware. This makes life easier, and is probably
148 # required for backwards compatibility with external tests against
149 # admin views.
150 request._dont_enforce_csrf_checks = not self.enforce_csrf_checks
152 # Request goes through middleware.
153 response = self.get_response(request)
155 # Simulate behaviors of most web servers.
156 conditional_content_removal(request, response)
158 # Attach the originating request to the response so that it could be
159 # later retrieved.
160 response.wsgi_request = request
162 # Emulate a WSGI server by calling the close method on completion.
163 if response.streaming: 163 ↛ 164line 163 didn't jump to line 164, because the condition on line 163 was never true
164 response.streaming_content = closing_iterator_wrapper(
165 response.streaming_content, response.close
166 )
167 else:
168 request_finished.disconnect(close_old_connections)
169 response.close() # will fire request_finished
170 request_finished.connect(close_old_connections)
172 return response
175class AsyncClientHandler(BaseHandler):
176 """An async version of ClientHandler."""
178 def __init__(self, enforce_csrf_checks=True, *args, **kwargs):
179 self.enforce_csrf_checks = enforce_csrf_checks
180 super().__init__(*args, **kwargs)
182 async def __call__(self, scope):
183 # Set up middleware if needed. We couldn't do this earlier, because
184 # settings weren't available.
185 if self._middleware_chain is None:
186 self.load_middleware(is_async=True)
187 # Extract body file from the scope, if provided.
188 if "_body_file" in scope:
189 body_file = scope.pop("_body_file")
190 else:
191 body_file = FakePayload("")
193 request_started.disconnect(close_old_connections)
194 await sync_to_async(request_started.send, thread_sensitive=False)(
195 sender=self.__class__, scope=scope
196 )
197 request_started.connect(close_old_connections)
198 request = ASGIRequest(scope, body_file)
199 # Sneaky little hack so that we can easily get round
200 # CsrfViewMiddleware. This makes life easier, and is probably required
201 # for backwards compatibility with external tests against admin views.
202 request._dont_enforce_csrf_checks = not self.enforce_csrf_checks
203 # Request goes through middleware.
204 response = await self.get_response_async(request)
205 # Simulate behaviors of most web servers.
206 conditional_content_removal(request, response)
207 # Attach the originating ASGI request to the response so that it could
208 # be later retrieved.
209 response.asgi_request = request
210 # Emulate a server by calling the close method on completion.
211 if response.streaming:
212 response.streaming_content = await sync_to_async(
213 closing_iterator_wrapper, thread_sensitive=False
214 )(
215 response.streaming_content,
216 response.close,
217 )
218 else:
219 request_finished.disconnect(close_old_connections)
220 # Will fire request_finished.
221 await sync_to_async(response.close, thread_sensitive=False)()
222 request_finished.connect(close_old_connections)
223 return response
226def store_rendered_templates(store, signal, sender, template, context, **kwargs):
227 """
228 Store templates and contexts that are rendered.
230 The context is copied so that it is an accurate representation at the time
231 of rendering.
232 """
233 store.setdefault("templates", []).append(template)
234 if "context" not in store:
235 store["context"] = ContextList()
236 store["context"].append(copy(context))
239def encode_multipart(boundary, data):
240 """
241 Encode multipart POST data from a dictionary of form values.
243 The key will be used as the form data name; the value will be transmitted
244 as content. If the value is a file, the contents of the file will be sent
245 as an application/octet-stream; otherwise, str(value) will be sent.
246 """
247 lines = []
249 def to_bytes(s):
250 return force_bytes(s, settings.DEFAULT_CHARSET)
252 # Not by any means perfect, but good enough for our purposes.
253 def is_file(thing):
254 return hasattr(thing, "read") and callable(thing.read)
256 # Each bit of the multipart form data could be either a form value or a
257 # file, or a *list* of form values and/or files. Remember that HTTP field
258 # names can be duplicated!
259 for (key, value) in data.items():
260 if value is None: 260 ↛ 261line 260 didn't jump to line 261, because the condition on line 260 was never true
261 raise TypeError(
262 "Cannot encode None for key '%s' as POST data. Did you mean "
263 "to pass an empty string or omit the value?" % key
264 )
265 elif is_file(value):
266 lines.extend(encode_file(boundary, key, value))
267 elif not isinstance(value, str) and is_iterable(value): 267 ↛ 268line 267 didn't jump to line 268, because the condition on line 267 was never true
268 for item in value:
269 if is_file(item):
270 lines.extend(encode_file(boundary, key, item))
271 else:
272 lines.extend(
273 to_bytes(val)
274 for val in [
275 "--%s" % boundary,
276 'Content-Disposition: form-data; name="%s"' % key,
277 "",
278 item,
279 ]
280 )
281 else:
282 lines.extend(
283 to_bytes(val)
284 for val in [
285 "--%s" % boundary,
286 'Content-Disposition: form-data; name="%s"' % key,
287 "",
288 value,
289 ]
290 )
292 lines.extend(
293 [
294 to_bytes("--%s--" % boundary),
295 b"",
296 ]
297 )
298 return b"\r\n".join(lines)
301def encode_file(boundary, key, file):
302 def to_bytes(s):
303 return force_bytes(s, settings.DEFAULT_CHARSET)
305 # file.name might not be a string. For example, it's an int for
306 # tempfile.TemporaryFile().
307 file_has_string_name = hasattr(file, "name") and isinstance(file.name, str)
308 filename = os.path.basename(file.name) if file_has_string_name else ""
310 if hasattr(file, "content_type"): 310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true
311 content_type = file.content_type
312 elif filename: 312 ↛ 315line 312 didn't jump to line 315, because the condition on line 312 was never false
313 content_type = mimetypes.guess_type(filename)[0]
314 else:
315 content_type = None
317 if content_type is None: 317 ↛ 318line 317 didn't jump to line 318, because the condition on line 317 was never true
318 content_type = "application/octet-stream"
319 filename = filename or key
320 return [
321 to_bytes("--%s" % boundary),
322 to_bytes(
323 'Content-Disposition: form-data; name="%s"; filename="%s"' % (key, filename)
324 ),
325 to_bytes("Content-Type: %s" % content_type),
326 b"",
327 to_bytes(file.read()),
328 ]
331class RequestFactory:
332 """
333 Class that lets you create mock Request objects for use in testing.
335 Usage:
337 rf = RequestFactory()
338 get_request = rf.get('/hello/')
339 post_request = rf.post('/submit/', {'foo': 'bar'})
341 Once you have a request object you can pass it to any view function,
342 just as if that view had been hooked up using a URLconf.
343 """
345 def __init__(self, *, json_encoder=DjangoJSONEncoder, **defaults):
346 self.json_encoder = json_encoder
347 self.defaults = defaults
348 self.cookies = SimpleCookie()
349 self.errors = BytesIO()
351 def _base_environ(self, **request):
352 """
353 The base environment for a request.
354 """
355 # This is a minimal valid WSGI environ dictionary, plus:
356 # - HTTP_COOKIE: for cookie support,
357 # - REMOTE_ADDR: often useful, see #8551.
358 # See https://www.python.org/dev/peps/pep-3333/#environ-variables
359 return {
360 "HTTP_COOKIE": "; ".join(
361 sorted(
362 "%s=%s" % (morsel.key, morsel.coded_value)
363 for morsel in self.cookies.values()
364 )
365 ),
366 "PATH_INFO": "/",
367 "REMOTE_ADDR": "127.0.0.1",
368 "REQUEST_METHOD": "GET",
369 "SCRIPT_NAME": "",
370 "SERVER_NAME": "testserver",
371 "SERVER_PORT": "80",
372 "SERVER_PROTOCOL": "HTTP/1.1",
373 "wsgi.version": (1, 0),
374 "wsgi.url_scheme": "http",
375 "wsgi.input": FakePayload(b""),
376 "wsgi.errors": self.errors,
377 "wsgi.multiprocess": True,
378 "wsgi.multithread": False,
379 "wsgi.run_once": False,
380 **self.defaults,
381 **request,
382 }
384 def request(self, **request):
385 "Construct a generic request object."
386 return WSGIRequest(self._base_environ(**request))
388 def _encode_data(self, data, content_type):
389 if content_type is MULTIPART_CONTENT:
390 return encode_multipart(BOUNDARY, data)
391 else:
392 # Encode the content so that the byte representation is correct.
393 match = CONTENT_TYPE_RE.match(content_type)
394 if match:
395 charset = match[1]
396 else:
397 charset = settings.DEFAULT_CHARSET
398 return force_bytes(data, encoding=charset)
400 def _encode_json(self, data, content_type):
401 """
402 Return encoded JSON if data is a dict, list, or tuple and content_type
403 is application/json.
404 """
405 should_encode = JSON_CONTENT_TYPE_RE.match(content_type) and isinstance(
406 data, (dict, list, tuple)
407 )
408 return json.dumps(data, cls=self.json_encoder) if should_encode else data
410 def _get_path(self, parsed):
411 path = parsed.path
412 # If there are parameters, add them
413 if parsed.params: 413 ↛ 414line 413 didn't jump to line 414, because the condition on line 413 was never true
414 path += ";" + parsed.params
415 path = unquote_to_bytes(path)
416 # Replace the behavior where non-ASCII values in the WSGI environ are
417 # arbitrarily decoded with ISO-8859-1.
418 # Refs comment in `get_bytes_from_wsgi()`.
419 return path.decode("iso-8859-1")
421 def get(self, path, data=None, secure=False, **extra):
422 """Construct a GET request."""
423 data = {} if data is None else data
424 return self.generic(
425 "GET",
426 path,
427 secure=secure,
428 **{
429 "QUERY_STRING": urlencode(data, doseq=True),
430 **extra,
431 },
432 )
434 def post(
435 self, path, data=None, content_type=MULTIPART_CONTENT, secure=False, **extra
436 ):
437 """Construct a POST request."""
438 data = self._encode_json({} if data is None else data, content_type)
439 post_data = self._encode_data(data, content_type)
441 return self.generic(
442 "POST", path, post_data, content_type, secure=secure, **extra
443 )
445 def head(self, path, data=None, secure=False, **extra):
446 """Construct a HEAD request."""
447 data = {} if data is None else data
448 return self.generic(
449 "HEAD",
450 path,
451 secure=secure,
452 **{
453 "QUERY_STRING": urlencode(data, doseq=True),
454 **extra,
455 },
456 )
458 def trace(self, path, secure=False, **extra):
459 """Construct a TRACE request."""
460 return self.generic("TRACE", path, secure=secure, **extra)
462 def options(
463 self,
464 path,
465 data="",
466 content_type="application/octet-stream",
467 secure=False,
468 **extra,
469 ):
470 "Construct an OPTIONS request."
471 return self.generic("OPTIONS", path, data, content_type, secure=secure, **extra)
473 def put(
474 self,
475 path,
476 data="",
477 content_type="application/octet-stream",
478 secure=False,
479 **extra,
480 ):
481 """Construct a PUT request."""
482 data = self._encode_json(data, content_type)
483 return self.generic("PUT", path, data, content_type, secure=secure, **extra)
485 def patch(
486 self,
487 path,
488 data="",
489 content_type="application/octet-stream",
490 secure=False,
491 **extra,
492 ):
493 """Construct a PATCH request."""
494 data = self._encode_json(data, content_type)
495 return self.generic("PATCH", path, data, content_type, secure=secure, **extra)
497 def delete(
498 self,
499 path,
500 data="",
501 content_type="application/octet-stream",
502 secure=False,
503 **extra,
504 ):
505 """Construct a DELETE request."""
506 data = self._encode_json(data, content_type)
507 return self.generic("DELETE", path, data, content_type, secure=secure, **extra)
509 def generic(
510 self,
511 method,
512 path,
513 data="",
514 content_type="application/octet-stream",
515 secure=False,
516 **extra,
517 ):
518 """Construct an arbitrary HTTP request."""
519 parsed = urlparse(str(path)) # path can be lazy
520 data = force_bytes(data, settings.DEFAULT_CHARSET)
521 r = {
522 "PATH_INFO": self._get_path(parsed),
523 "REQUEST_METHOD": method,
524 "SERVER_PORT": "443" if secure else "80",
525 "wsgi.url_scheme": "https" if secure else "http",
526 }
527 if data:
528 r.update(
529 {
530 "CONTENT_LENGTH": str(len(data)),
531 "CONTENT_TYPE": content_type,
532 "wsgi.input": FakePayload(data),
533 }
534 )
535 r.update(extra)
536 # If QUERY_STRING is absent or empty, we want to extract it from the URL.
537 if not r.get("QUERY_STRING"):
538 # WSGI requires latin-1 encoded strings. See get_path_info().
539 query_string = parsed[4].encode().decode("iso-8859-1")
540 r["QUERY_STRING"] = query_string
541 return self.request(**r)
544class AsyncRequestFactory(RequestFactory):
545 """
546 Class that lets you create mock ASGI-like Request objects for use in
547 testing. Usage:
549 rf = AsyncRequestFactory()
550 get_request = await rf.get('/hello/')
551 post_request = await rf.post('/submit/', {'foo': 'bar'})
553 Once you have a request object you can pass it to any view function,
554 including synchronous ones. The reason we have a separate class here is:
555 a) this makes ASGIRequest subclasses, and
556 b) AsyncTestClient can subclass it.
557 """
559 def _base_scope(self, **request):
560 """The base scope for a request."""
561 # This is a minimal valid ASGI scope, plus:
562 # - headers['cookie'] for cookie support,
563 # - 'client' often useful, see #8551.
564 scope = {
565 "asgi": {"version": "3.0"},
566 "type": "http",
567 "http_version": "1.1",
568 "client": ["127.0.0.1", 0],
569 "server": ("testserver", "80"),
570 "scheme": "http",
571 "method": "GET",
572 "headers": [],
573 **self.defaults,
574 **request,
575 }
576 scope["headers"].append(
577 (
578 b"cookie",
579 b"; ".join(
580 sorted(
581 ("%s=%s" % (morsel.key, morsel.coded_value)).encode("ascii")
582 for morsel in self.cookies.values()
583 )
584 ),
585 )
586 )
587 return scope
589 def request(self, **request):
590 """Construct a generic request object."""
591 # This is synchronous, which means all methods on this class are.
592 # AsyncClient, however, has an async request function, which makes all
593 # its methods async.
594 if "_body_file" in request:
595 body_file = request.pop("_body_file")
596 else:
597 body_file = FakePayload("")
598 return ASGIRequest(self._base_scope(**request), body_file)
600 def generic(
601 self,
602 method,
603 path,
604 data="",
605 content_type="application/octet-stream",
606 secure=False,
607 **extra,
608 ):
609 """Construct an arbitrary HTTP request."""
610 parsed = urlparse(str(path)) # path can be lazy.
611 data = force_bytes(data, settings.DEFAULT_CHARSET)
612 s = {
613 "method": method,
614 "path": self._get_path(parsed),
615 "server": ("127.0.0.1", "443" if secure else "80"),
616 "scheme": "https" if secure else "http",
617 "headers": [(b"host", b"testserver")],
618 }
619 if data:
620 s["headers"].extend(
621 [
622 (b"content-length", str(len(data)).encode("ascii")),
623 (b"content-type", content_type.encode("ascii")),
624 ]
625 )
626 s["_body_file"] = FakePayload(data)
627 follow = extra.pop("follow", None)
628 if follow is not None:
629 s["follow"] = follow
630 if query_string := extra.pop("QUERY_STRING", None):
631 s["query_string"] = query_string
632 s["headers"] += [
633 (key.lower().encode("ascii"), value.encode("latin1"))
634 for key, value in extra.items()
635 ]
636 # If QUERY_STRING is absent or empty, we want to extract it from the
637 # URL.
638 if not s.get("query_string"):
639 s["query_string"] = parsed[4]
640 return self.request(**s)
643class ClientMixin:
644 """
645 Mixin with common methods between Client and AsyncClient.
646 """
648 def store_exc_info(self, **kwargs):
649 """Store exceptions when they are generated by a view."""
650 self.exc_info = sys.exc_info()
652 def check_exception(self, response):
653 """
654 Look for a signaled exception, clear the current context exception
655 data, re-raise the signaled exception, and clear the signaled exception
656 from the local cache.
657 """
658 response.exc_info = self.exc_info
659 if self.exc_info: 659 ↛ 660line 659 didn't jump to line 660, because the condition on line 659 was never true
660 _, exc_value, _ = self.exc_info
661 self.exc_info = None
662 if self.raise_request_exception:
663 raise exc_value
665 @property
666 def session(self):
667 """Return the current session variables."""
668 engine = import_module(settings.SESSION_ENGINE)
669 cookie = self.cookies.get(settings.SESSION_COOKIE_NAME)
670 if cookie:
671 return engine.SessionStore(cookie.value)
672 session = engine.SessionStore()
673 session.save()
674 self.cookies[settings.SESSION_COOKIE_NAME] = session.session_key
675 return session
677 def login(self, **credentials):
678 """
679 Set the Factory to appear as if it has successfully logged into a site.
681 Return True if login is possible or False if the provided credentials
682 are incorrect.
683 """
684 from django.contrib.auth import authenticate
686 user = authenticate(**credentials)
687 if user:
688 self._login(user)
689 return True
690 return False
692 def force_login(self, user, backend=None):
693 def get_backend():
694 from django.contrib.auth import load_backend
696 for backend_path in settings.AUTHENTICATION_BACKENDS:
697 backend = load_backend(backend_path)
698 if hasattr(backend, "get_user"):
699 return backend_path
701 if backend is None:
702 backend = get_backend()
703 user.backend = backend
704 self._login(user, backend)
706 def _login(self, user, backend=None):
707 from django.contrib.auth import login
709 # Create a fake request to store login details.
710 request = HttpRequest()
711 if self.session:
712 request.session = self.session
713 else:
714 engine = import_module(settings.SESSION_ENGINE)
715 request.session = engine.SessionStore()
716 login(request, user, backend)
717 # Save the session values.
718 request.session.save()
719 # Set the cookie to represent the session.
720 session_cookie = settings.SESSION_COOKIE_NAME
721 self.cookies[session_cookie] = request.session.session_key
722 cookie_data = {
723 "max-age": None,
724 "path": "/",
725 "domain": settings.SESSION_COOKIE_DOMAIN,
726 "secure": settings.SESSION_COOKIE_SECURE or None,
727 "expires": None,
728 }
729 self.cookies[session_cookie].update(cookie_data)
731 def logout(self):
732 """Log out the user by removing the cookies and session object."""
733 from django.contrib.auth import get_user, logout
735 request = HttpRequest()
736 if self.session:
737 request.session = self.session
738 request.user = get_user(request)
739 else:
740 engine = import_module(settings.SESSION_ENGINE)
741 request.session = engine.SessionStore()
742 logout(request)
743 self.cookies = SimpleCookie()
745 def _parse_json(self, response, **extra):
746 if not hasattr(response, "_json"): 746 ↛ 755line 746 didn't jump to line 755, because the condition on line 746 was never false
747 if not JSON_CONTENT_TYPE_RE.match(response.get("Content-Type")): 747 ↛ 748line 747 didn't jump to line 748, because the condition on line 747 was never true
748 raise ValueError(
749 'Content-Type header is "%s", not "application/json"'
750 % response.get("Content-Type")
751 )
752 response._json = json.loads(
753 response.content.decode(response.charset), **extra
754 )
755 return response._json
758class Client(ClientMixin, RequestFactory):
759 """
760 A class that can act as a client for testing purposes.
762 It allows the user to compose GET and POST requests, and
763 obtain the response that the server gave to those requests.
764 The server Response objects are annotated with the details
765 of the contexts and templates that were rendered during the
766 process of serving the request.
768 Client objects are stateful - they will retain cookie (and
769 thus session) details for the lifetime of the Client instance.
771 This is not intended as a replacement for Twill/Selenium or
772 the like - it is here to allow testing against the
773 contexts and templates produced by a view, rather than the
774 HTML rendered to the end-user.
775 """
777 def __init__(
778 self, enforce_csrf_checks=False, raise_request_exception=True, **defaults
779 ):
780 super().__init__(**defaults)
781 self.handler = ClientHandler(enforce_csrf_checks)
782 self.raise_request_exception = raise_request_exception
783 self.exc_info = None
784 self.extra = None
786 def request(self, **request):
787 """
788 The master request method. Compose the environment dictionary and pass
789 to the handler, return the result of the handler. Assume defaults for
790 the query environment, which can be overridden using the arguments to
791 the request.
792 """
793 environ = self._base_environ(**request)
795 # Curry a data dictionary into an instance of the template renderer
796 # callback function.
797 data = {}
798 on_template_render = partial(store_rendered_templates, data)
799 signal_uid = "template-render-%s" % id(request)
800 signals.template_rendered.connect(on_template_render, dispatch_uid=signal_uid)
801 # Capture exceptions created by the handler.
802 exception_uid = "request-exception-%s" % id(request)
803 got_request_exception.connect(self.store_exc_info, dispatch_uid=exception_uid)
804 try:
805 response = self.handler(environ)
806 finally:
807 signals.template_rendered.disconnect(dispatch_uid=signal_uid)
808 got_request_exception.disconnect(dispatch_uid=exception_uid)
809 # Check for signaled exceptions.
810 self.check_exception(response)
811 # Save the client and request that stimulated the response.
812 response.client = self
813 response.request = request
814 # Add any rendered template detail to the response.
815 response.templates = data.get("templates", [])
816 response.context = data.get("context")
817 response.json = partial(self._parse_json, response)
818 # Attach the ResolverMatch instance to the response.
819 urlconf = getattr(response.wsgi_request, "urlconf", None)
820 response.resolver_match = SimpleLazyObject( 820 ↛ exitline 820 didn't jump to the function exit
821 lambda: resolve(request["PATH_INFO"], urlconf=urlconf),
822 )
823 # Flatten a single context. Not really necessary anymore thanks to the
824 # __getattr__ flattening in ContextList, but has some edge case
825 # backwards compatibility implications.
826 if response.context and len(response.context) == 1: 826 ↛ 827line 826 didn't jump to line 827, because the condition on line 826 was never true
827 response.context = response.context[0]
828 # Update persistent cookie data.
829 if response.cookies: 829 ↛ 830line 829 didn't jump to line 830, because the condition on line 829 was never true
830 self.cookies.update(response.cookies)
831 return response
833 def get(self, path, data=None, follow=False, secure=False, **extra):
834 """Request a response from the server using GET."""
835 self.extra = extra
836 response = super().get(path, data=data, secure=secure, **extra)
837 if follow:
838 response = self._handle_redirects(response, data=data, **extra)
839 return response
841 def post(
842 self,
843 path,
844 data=None,
845 content_type=MULTIPART_CONTENT,
846 follow=False,
847 secure=False,
848 **extra,
849 ):
850 """Request a response from the server using POST."""
851 self.extra = extra
852 response = super().post(
853 path, data=data, content_type=content_type, secure=secure, **extra
854 )
855 if follow:
856 response = self._handle_redirects(
857 response, data=data, content_type=content_type, **extra
858 )
859 return response
861 def head(self, path, data=None, follow=False, secure=False, **extra):
862 """Request a response from the server using HEAD."""
863 self.extra = extra
864 response = super().head(path, data=data, secure=secure, **extra)
865 if follow:
866 response = self._handle_redirects(response, data=data, **extra)
867 return response
869 def options(
870 self,
871 path,
872 data="",
873 content_type="application/octet-stream",
874 follow=False,
875 secure=False,
876 **extra,
877 ):
878 """Request a response from the server using OPTIONS."""
879 self.extra = extra
880 response = super().options(
881 path, data=data, content_type=content_type, secure=secure, **extra
882 )
883 if follow:
884 response = self._handle_redirects(
885 response, data=data, content_type=content_type, **extra
886 )
887 return response
889 def put(
890 self,
891 path,
892 data="",
893 content_type="application/octet-stream",
894 follow=False,
895 secure=False,
896 **extra,
897 ):
898 """Send a resource to the server using PUT."""
899 self.extra = extra
900 response = super().put(
901 path, data=data, content_type=content_type, secure=secure, **extra
902 )
903 if follow:
904 response = self._handle_redirects(
905 response, data=data, content_type=content_type, **extra
906 )
907 return response
909 def patch(
910 self,
911 path,
912 data="",
913 content_type="application/octet-stream",
914 follow=False,
915 secure=False,
916 **extra,
917 ):
918 """Send a resource to the server using PATCH."""
919 self.extra = extra
920 response = super().patch(
921 path, data=data, content_type=content_type, secure=secure, **extra
922 )
923 if follow:
924 response = self._handle_redirects(
925 response, data=data, content_type=content_type, **extra
926 )
927 return response
929 def delete(
930 self,
931 path,
932 data="",
933 content_type="application/octet-stream",
934 follow=False,
935 secure=False,
936 **extra,
937 ):
938 """Send a DELETE request to the server."""
939 self.extra = extra
940 response = super().delete(
941 path, data=data, content_type=content_type, secure=secure, **extra
942 )
943 if follow:
944 response = self._handle_redirects(
945 response, data=data, content_type=content_type, **extra
946 )
947 return response
949 def trace(self, path, data="", follow=False, secure=False, **extra):
950 """Send a TRACE request to the server."""
951 self.extra = extra
952 response = super().trace(path, data=data, secure=secure, **extra)
953 if follow:
954 response = self._handle_redirects(response, data=data, **extra)
955 return response
957 def _handle_redirects(self, response, data="", content_type="", **extra):
958 """
959 Follow any redirects by requesting responses from the server using GET.
960 """
961 response.redirect_chain = []
962 redirect_status_codes = (
963 HTTPStatus.MOVED_PERMANENTLY,
964 HTTPStatus.FOUND,
965 HTTPStatus.SEE_OTHER,
966 HTTPStatus.TEMPORARY_REDIRECT,
967 HTTPStatus.PERMANENT_REDIRECT,
968 )
969 while response.status_code in redirect_status_codes:
970 response_url = response.url
971 redirect_chain = response.redirect_chain
972 redirect_chain.append((response_url, response.status_code))
974 url = urlsplit(response_url)
975 if url.scheme:
976 extra["wsgi.url_scheme"] = url.scheme
977 if url.hostname:
978 extra["SERVER_NAME"] = url.hostname
979 if url.port:
980 extra["SERVER_PORT"] = str(url.port)
982 path = url.path
983 # RFC 2616: bare domains without path are treated as the root.
984 if not path and url.netloc:
985 path = "/"
986 # Prepend the request path to handle relative path redirects
987 if not path.startswith("/"):
988 path = urljoin(response.request["PATH_INFO"], path)
990 if response.status_code in (
991 HTTPStatus.TEMPORARY_REDIRECT,
992 HTTPStatus.PERMANENT_REDIRECT,
993 ):
994 # Preserve request method and query string (if needed)
995 # post-redirect for 307/308 responses.
996 request_method = response.request["REQUEST_METHOD"].lower()
997 if request_method not in ("get", "head"):
998 extra["QUERY_STRING"] = url.query
999 request_method = getattr(self, request_method)
1000 else:
1001 request_method = self.get
1002 data = QueryDict(url.query)
1003 content_type = None
1005 response = request_method(
1006 path, data=data, content_type=content_type, follow=False, **extra
1007 )
1008 response.redirect_chain = redirect_chain
1010 if redirect_chain[-1] in redirect_chain[:-1]:
1011 # Check that we're not redirecting to somewhere we've already
1012 # been to, to prevent loops.
1013 raise RedirectCycleError(
1014 "Redirect loop detected.", last_response=response
1015 )
1016 if len(redirect_chain) > 20:
1017 # Such a lengthy chain likely also means a loop, but one with
1018 # a growing path, changing view, or changing query argument;
1019 # 20 is the value of "network.http.redirection-limit" from Firefox.
1020 raise RedirectCycleError("Too many redirects.", last_response=response)
1022 return response
1025class AsyncClient(ClientMixin, AsyncRequestFactory):
1026 """
1027 An async version of Client that creates ASGIRequests and calls through an
1028 async request path.
1030 Does not currently support "follow" on its methods.
1031 """
1033 def __init__(
1034 self, enforce_csrf_checks=False, raise_request_exception=True, **defaults
1035 ):
1036 super().__init__(**defaults)
1037 self.handler = AsyncClientHandler(enforce_csrf_checks)
1038 self.raise_request_exception = raise_request_exception
1039 self.exc_info = None
1040 self.extra = None
1042 async def request(self, **request):
1043 """
1044 The master request method. Compose the scope dictionary and pass to the
1045 handler, return the result of the handler. Assume defaults for the
1046 query environment, which can be overridden using the arguments to the
1047 request.
1048 """
1049 if "follow" in request:
1050 raise NotImplementedError(
1051 "AsyncClient request methods do not accept the follow parameter."
1052 )
1053 scope = self._base_scope(**request)
1054 # Curry a data dictionary into an instance of the template renderer
1055 # callback function.
1056 data = {}
1057 on_template_render = partial(store_rendered_templates, data)
1058 signal_uid = "template-render-%s" % id(request)
1059 signals.template_rendered.connect(on_template_render, dispatch_uid=signal_uid)
1060 # Capture exceptions created by the handler.
1061 exception_uid = "request-exception-%s" % id(request)
1062 got_request_exception.connect(self.store_exc_info, dispatch_uid=exception_uid)
1063 try:
1064 response = await self.handler(scope)
1065 finally:
1066 signals.template_rendered.disconnect(dispatch_uid=signal_uid)
1067 got_request_exception.disconnect(dispatch_uid=exception_uid)
1068 # Check for signaled exceptions.
1069 self.check_exception(response)
1070 # Save the client and request that stimulated the response.
1071 response.client = self
1072 response.request = request
1073 # Add any rendered template detail to the response.
1074 response.templates = data.get("templates", [])
1075 response.context = data.get("context")
1076 response.json = partial(self._parse_json, response)
1077 # Attach the ResolverMatch instance to the response.
1078 urlconf = getattr(response.asgi_request, "urlconf", None)
1079 response.resolver_match = SimpleLazyObject(
1080 lambda: resolve(request["path"], urlconf=urlconf),
1081 )
1082 # Flatten a single context. Not really necessary anymore thanks to the
1083 # __getattr__ flattening in ContextList, but has some edge case
1084 # backwards compatibility implications.
1085 if response.context and len(response.context) == 1:
1086 response.context = response.context[0]
1087 # Update persistent cookie data.
1088 if response.cookies:
1089 self.cookies.update(response.cookies)
1090 return response