Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/http/response.py: 53%

339 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import datetime 

2import json 

3import mimetypes 

4import os 

5import re 

6import sys 

7import time 

8from collections.abc import Mapping 

9from email.header import Header 

10from http.client import responses 

11from urllib.parse import quote, urlparse 

12 

13from django.conf import settings 

14from django.core import signals, signing 

15from django.core.exceptions import DisallowedRedirect 

16from django.core.serializers.json import DjangoJSONEncoder 

17from django.http.cookie import SimpleCookie 

18from django.utils import timezone 

19from django.utils.datastructures import ( 

20 CaseInsensitiveMapping, 

21 _destruct_iterable_mapping_values, 

22) 

23from django.utils.encoding import iri_to_uri 

24from django.utils.http import http_date 

25from django.utils.regex_helper import _lazy_re_compile 

26 

27_charset_from_content_type_re = _lazy_re_compile( 

28 r";\s*charset=(?P<charset>[^\s;]+)", re.I 

29) 

30 

31 

32class ResponseHeaders(CaseInsensitiveMapping): 

33 def __init__(self, data): 

34 """ 

35 Populate the initial data using __setitem__ to ensure values are 

36 correctly encoded. 

37 """ 

38 if not isinstance(data, Mapping): 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true

39 data = {k: v for k, v in _destruct_iterable_mapping_values(data)} 

40 self._store = {} 

41 for header, value in data.items(): 41 ↛ 42line 41 didn't jump to line 42, because the loop on line 41 never started

42 self[header] = value 

43 

44 def _convert_to_charset(self, value, charset, mime_encode=False): 

45 """ 

46 Convert headers key/value to ascii/latin-1 native strings. 

47 `charset` must be 'ascii' or 'latin-1'. If `mime_encode` is True and 

48 `value` can't be represented in the given charset, apply MIME-encoding. 

49 """ 

50 if not isinstance(value, (bytes, str)): 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true

51 value = str(value) 

52 if (isinstance(value, bytes) and (b"\n" in value or b"\r" in value)) or ( 52 ↛ 55line 52 didn't jump to line 55, because the condition on line 52 was never true

53 isinstance(value, str) and ("\n" in value or "\r" in value) 

54 ): 

55 raise BadHeaderError( 

56 "Header values can't contain newlines (got %r)" % value 

57 ) 

58 try: 

59 if isinstance(value, str): 59 ↛ 64line 59 didn't jump to line 64, because the condition on line 59 was never false

60 # Ensure string is valid in given charset 

61 value.encode(charset) 

62 else: 

63 # Convert bytestring using given charset 

64 value = value.decode(charset) 

65 except UnicodeError as e: 

66 if mime_encode: 

67 value = Header(value, "utf-8", maxlinelen=sys.maxsize).encode() 

68 else: 

69 e.reason += ", HTTP response headers must be in %s format" % charset 

70 raise 

71 return value 

72 

73 def __delitem__(self, key): 

74 self.pop(key) 

75 

76 def __setitem__(self, key, value): 

77 key = self._convert_to_charset(key, "ascii") 

78 value = self._convert_to_charset(value, "latin-1", mime_encode=True) 

79 self._store[key.lower()] = (key, value) 

80 

81 def pop(self, key, default=None): 

82 return self._store.pop(key.lower(), default) 

83 

84 def setdefault(self, key, value): 

85 if key not in self: 85 ↛ exitline 85 didn't return from function 'setdefault', because the condition on line 85 was never false

86 self[key] = value 

87 

88 

89class BadHeaderError(ValueError): 

90 pass 

91 

92 

93class HttpResponseBase: 

94 """ 

95 An HTTP response base class with dictionary-accessed headers. 

96 

97 This class doesn't handle content. It should not be used directly. 

98 Use the HttpResponse and StreamingHttpResponse subclasses instead. 

99 """ 

100 

101 status_code = 200 

102 

103 def __init__( 

104 self, content_type=None, status=None, reason=None, charset=None, headers=None 

105 ): 

106 self.headers = ResponseHeaders(headers or {}) 

107 self._charset = charset 

108 if content_type and "Content-Type" in self.headers: 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true

109 raise ValueError( 

110 "'headers' must not contain 'Content-Type' when the " 

111 "'content_type' parameter is provided." 

112 ) 

113 if "Content-Type" not in self.headers: 113 ↛ 117line 113 didn't jump to line 117, because the condition on line 113 was never false

114 if content_type is None: 114 ↛ 116line 114 didn't jump to line 116, because the condition on line 114 was never false

115 content_type = "text/html; charset=%s" % self.charset 

116 self.headers["Content-Type"] = content_type 

117 self._resource_closers = [] 

118 # This parameter is set by the handler. It's necessary to preserve the 

119 # historical behavior of request_finished. 

120 self._handler_class = None 

121 self.cookies = SimpleCookie() 

122 self.closed = False 

123 if status is not None: 

124 try: 

125 self.status_code = int(status) 

126 except (ValueError, TypeError): 

127 raise TypeError("HTTP status code must be an integer.") 

128 

129 if not 100 <= self.status_code <= 599: 129 ↛ 130line 129 didn't jump to line 130, because the condition on line 129 was never true

130 raise ValueError("HTTP status code must be an integer from 100 to 599.") 

131 self._reason_phrase = reason 

132 

133 @property 

134 def reason_phrase(self): 

135 if self._reason_phrase is not None: 135 ↛ 136line 135 didn't jump to line 136, because the condition on line 135 was never true

136 return self._reason_phrase 

137 # Leave self._reason_phrase unset in order to use the default 

138 # reason phrase for status code. 

139 return responses.get(self.status_code, "Unknown Status Code") 

140 

141 @reason_phrase.setter 

142 def reason_phrase(self, value): 

143 self._reason_phrase = value 

144 

145 @property 

146 def charset(self): 

147 if self._charset is not None: 147 ↛ 148line 147 didn't jump to line 148, because the condition on line 147 was never true

148 return self._charset 

149 content_type = self.get("Content-Type", "") 

150 matched = _charset_from_content_type_re.search(content_type) 

151 if matched: 

152 # Extract the charset and strip its double quotes 

153 return matched["charset"].replace('"', "") 

154 return settings.DEFAULT_CHARSET 

155 

156 @charset.setter 

157 def charset(self, value): 

158 self._charset = value 

159 

160 def serialize_headers(self): 

161 """HTTP headers as a bytestring.""" 

162 return b"\r\n".join( 

163 [ 

164 key.encode("ascii") + b": " + value.encode("latin-1") 

165 for key, value in self.headers.items() 

166 ] 

167 ) 

168 

169 __bytes__ = serialize_headers 

170 

171 @property 

172 def _content_type_for_repr(self): 

173 return ( 

174 ', "%s"' % self.headers["Content-Type"] 

175 if "Content-Type" in self.headers 

176 else "" 

177 ) 

178 

179 def __setitem__(self, header, value): 

180 self.headers[header] = value 

181 

182 def __delitem__(self, header): 

183 del self.headers[header] 

184 

185 def __getitem__(self, header): 

186 return self.headers[header] 

187 

188 def has_header(self, header): 

189 """Case-insensitive check for a header.""" 

190 return header in self.headers 

191 

192 __contains__ = has_header 

193 

194 def items(self): 

195 return self.headers.items() 

196 

197 def get(self, header, alternate=None): 

198 return self.headers.get(header, alternate) 

199 

200 def set_cookie( 

201 self, 

202 key, 

203 value="", 

204 max_age=None, 

205 expires=None, 

206 path="/", 

207 domain=None, 

208 secure=False, 

209 httponly=False, 

210 samesite=None, 

211 ): 

212 """ 

213 Set a cookie. 

214 

215 ``expires`` can be: 

216 - a string in the correct format, 

217 - a naive ``datetime.datetime`` object in UTC, 

218 - an aware ``datetime.datetime`` object in any time zone. 

219 If it is a ``datetime.datetime`` object then calculate ``max_age``. 

220 """ 

221 self.cookies[key] = value 

222 if expires is not None: 

223 if isinstance(expires, datetime.datetime): 

224 if timezone.is_naive(expires): 

225 expires = timezone.make_aware(expires, timezone.utc) 

226 delta = expires - datetime.datetime.now(tz=timezone.utc) 

227 # Add one second so the date matches exactly (a fraction of 

228 # time gets lost between converting to a timedelta and 

229 # then the date string). 

230 delta = delta + datetime.timedelta(seconds=1) 

231 # Just set max_age - the max_age logic will set expires. 

232 expires = None 

233 max_age = max(0, delta.days * 86400 + delta.seconds) 

234 else: 

235 self.cookies[key]["expires"] = expires 

236 else: 

237 self.cookies[key]["expires"] = "" 

238 if max_age is not None: 

239 self.cookies[key]["max-age"] = int(max_age) 

240 # IE requires expires, so set it if hasn't been already. 

241 if not expires: 

242 self.cookies[key]["expires"] = http_date(time.time() + max_age) 

243 if path is not None: 

244 self.cookies[key]["path"] = path 

245 if domain is not None: 

246 self.cookies[key]["domain"] = domain 

247 if secure: 

248 self.cookies[key]["secure"] = True 

249 if httponly: 

250 self.cookies[key]["httponly"] = True 

251 if samesite: 

252 if samesite.lower() not in ("lax", "none", "strict"): 

253 raise ValueError('samesite must be "lax", "none", or "strict".') 

254 self.cookies[key]["samesite"] = samesite 

255 

256 def setdefault(self, key, value): 

257 """Set a header unless it has already been set.""" 

258 self.headers.setdefault(key, value) 

259 

260 def set_signed_cookie(self, key, value, salt="", **kwargs): 

261 value = signing.get_cookie_signer(salt=key + salt).sign(value) 

262 return self.set_cookie(key, value, **kwargs) 

263 

264 def delete_cookie(self, key, path="/", domain=None, samesite=None): 

265 # Browsers can ignore the Set-Cookie header if the cookie doesn't use 

266 # the secure flag and: 

267 # - the cookie name starts with "__Host-" or "__Secure-", or 

268 # - the samesite is "none". 

269 secure = key.startswith(("__Secure-", "__Host-")) or ( 

270 samesite and samesite.lower() == "none" 

271 ) 

272 self.set_cookie( 

273 key, 

274 max_age=0, 

275 path=path, 

276 domain=domain, 

277 secure=secure, 

278 expires="Thu, 01 Jan 1970 00:00:00 GMT", 

279 samesite=samesite, 

280 ) 

281 

282 # Common methods used by subclasses 

283 

284 def make_bytes(self, value): 

285 """Turn a value into a bytestring encoded in the output charset.""" 

286 # Per PEP 3333, this response body must be bytes. To avoid returning 

287 # an instance of a subclass, this function returns `bytes(value)`. 

288 # This doesn't make a copy when `value` already contains bytes. 

289 

290 # Handle string types -- we can't rely on force_bytes here because: 

291 # - Python attempts str conversion first 

292 # - when self._charset != 'utf-8' it re-encodes the content 

293 if isinstance(value, (bytes, memoryview)): 

294 return bytes(value) 

295 if isinstance(value, str): 295 ↛ 298line 295 didn't jump to line 298, because the condition on line 295 was never false

296 return bytes(value.encode(self.charset)) 

297 # Handle non-string types. 

298 return str(value).encode(self.charset) 

299 

300 # These methods partially implement the file-like object interface. 

301 # See https://docs.python.org/library/io.html#io.IOBase 

302 

303 # The WSGI server must call this method upon completion of the request. 

304 # See http://blog.dscpl.com.au/2012/10/obligations-for-calling-close-on.html 

305 def close(self): 

306 for closer in self._resource_closers: 

307 try: 

308 closer() 

309 except Exception: 

310 pass 

311 # Free resources that were still referenced. 

312 self._resource_closers.clear() 

313 self.closed = True 

314 signals.request_finished.send(sender=self._handler_class) 

315 

316 def write(self, content): 

317 raise OSError("This %s instance is not writable" % self.__class__.__name__) 

318 

319 def flush(self): 

320 pass 

321 

322 def tell(self): 

323 raise OSError( 

324 "This %s instance cannot tell its position" % self.__class__.__name__ 

325 ) 

326 

327 # These methods partially implement a stream-like object interface. 

328 # See https://docs.python.org/library/io.html#io.IOBase 

329 

330 def readable(self): 

331 return False 

332 

333 def seekable(self): 

334 return False 

335 

336 def writable(self): 

337 return False 

338 

339 def writelines(self, lines): 

340 raise OSError("This %s instance is not writable" % self.__class__.__name__) 

341 

342 

343class HttpResponse(HttpResponseBase): 

344 """ 

345 An HTTP response class with a string as content. 

346 

347 This content can be read, appended to, or replaced. 

348 """ 

349 

350 streaming = False 

351 

352 def __init__(self, content=b"", *args, **kwargs): 

353 super().__init__(*args, **kwargs) 

354 # Content is a bytestring. See the `content` property methods. 

355 self.content = content 

356 

357 def __repr__(self): 

358 return "<%(cls)s status_code=%(status_code)d%(content_type)s>" % { 

359 "cls": self.__class__.__name__, 

360 "status_code": self.status_code, 

361 "content_type": self._content_type_for_repr, 

362 } 

363 

364 def serialize(self): 

365 """Full HTTP message, including headers, as a bytestring.""" 

366 return self.serialize_headers() + b"\r\n\r\n" + self.content 

367 

368 __bytes__ = serialize 

369 

370 @property 

371 def content(self): 

372 return b"".join(self._container) 

373 

374 @content.setter 

375 def content(self, value): 

376 # Consume iterators upon assignment to allow repeated iteration. 

377 if hasattr(value, "__iter__") and not isinstance( 377 ↛ 380line 377 didn't jump to line 380, because the condition on line 377 was never true

378 value, (bytes, memoryview, str) 

379 ): 

380 content = b"".join(self.make_bytes(chunk) for chunk in value) 

381 if hasattr(value, "close"): 

382 try: 

383 value.close() 

384 except Exception: 

385 pass 

386 else: 

387 content = self.make_bytes(value) 

388 # Create a list of properly encoded bytestrings to support write(). 

389 self._container = [content] 

390 

391 def __iter__(self): 

392 return iter(self._container) 

393 

394 def write(self, content): 

395 self._container.append(self.make_bytes(content)) 

396 

397 def tell(self): 

398 return len(self.content) 

399 

400 def getvalue(self): 

401 return self.content 

402 

403 def writable(self): 

404 return True 

405 

406 def writelines(self, lines): 

407 for line in lines: 

408 self.write(line) 

409 

410 

411class StreamingHttpResponse(HttpResponseBase): 

412 """ 

413 A streaming HTTP response class with an iterator as content. 

414 

415 This should only be iterated once, when the response is streamed to the 

416 client. However, it can be appended to or replaced with a new iterator 

417 that wraps the original content (or yields entirely new content). 

418 """ 

419 

420 streaming = True 

421 

422 def __init__(self, streaming_content=(), *args, **kwargs): 

423 super().__init__(*args, **kwargs) 

424 # `streaming_content` should be an iterable of bytestrings. 

425 # See the `streaming_content` property methods. 

426 self.streaming_content = streaming_content 

427 

428 def __repr__(self): 

429 return "<%(cls)s status_code=%(status_code)d%(content_type)s>" % { 

430 "cls": self.__class__.__qualname__, 

431 "status_code": self.status_code, 

432 "content_type": self._content_type_for_repr, 

433 } 

434 

435 @property 

436 def content(self): 

437 raise AttributeError( 

438 "This %s instance has no `content` attribute. Use " 

439 "`streaming_content` instead." % self.__class__.__name__ 

440 ) 

441 

442 @property 

443 def streaming_content(self): 

444 return map(self.make_bytes, self._iterator) 

445 

446 @streaming_content.setter 

447 def streaming_content(self, value): 

448 self._set_streaming_content(value) 

449 

450 def _set_streaming_content(self, value): 

451 # Ensure we can never iterate on "value" more than once. 

452 self._iterator = iter(value) 

453 if hasattr(value, "close"): 

454 self._resource_closers.append(value.close) 

455 

456 def __iter__(self): 

457 return self.streaming_content 

458 

459 def getvalue(self): 

460 return b"".join(self.streaming_content) 

461 

462 

463class FileResponse(StreamingHttpResponse): 

464 """ 

465 A streaming HTTP response class optimized for files. 

466 """ 

467 

468 block_size = 4096 

469 

470 def __init__(self, *args, as_attachment=False, filename="", **kwargs): 

471 self.as_attachment = as_attachment 

472 self.filename = filename 

473 super().__init__(*args, **kwargs) 

474 

475 def _set_streaming_content(self, value): 

476 if not hasattr(value, "read"): 

477 self.file_to_stream = None 

478 return super()._set_streaming_content(value) 

479 

480 self.file_to_stream = filelike = value 

481 if hasattr(filelike, "close"): 

482 self._resource_closers.append(filelike.close) 

483 value = iter(lambda: filelike.read(self.block_size), b"") 

484 self.set_headers(filelike) 

485 super()._set_streaming_content(value) 

486 

487 def set_headers(self, filelike): 

488 """ 

489 Set some common response headers (Content-Length, Content-Type, and 

490 Content-Disposition) based on the `filelike` response content. 

491 """ 

492 encoding_map = { 

493 "bzip2": "application/x-bzip", 

494 "gzip": "application/gzip", 

495 "xz": "application/x-xz", 

496 } 

497 filename = getattr(filelike, "name", None) 

498 filename = ( 

499 filename if (isinstance(filename, str) and filename) else self.filename 

500 ) 

501 if os.path.isabs(filename): 

502 self.headers["Content-Length"] = os.path.getsize(filelike.name) 

503 elif hasattr(filelike, "getbuffer"): 

504 self.headers["Content-Length"] = filelike.getbuffer().nbytes 

505 

506 if self.headers.get("Content-Type", "").startswith("text/html"): 

507 if filename: 

508 content_type, encoding = mimetypes.guess_type(filename) 

509 # Encoding isn't set to prevent browsers from automatically 

510 # uncompressing files. 

511 content_type = encoding_map.get(encoding, content_type) 

512 self.headers["Content-Type"] = ( 

513 content_type or "application/octet-stream" 

514 ) 

515 else: 

516 self.headers["Content-Type"] = "application/octet-stream" 

517 

518 filename = self.filename or os.path.basename(filename) 

519 if filename: 

520 disposition = "attachment" if self.as_attachment else "inline" 

521 try: 

522 filename.encode("ascii") 

523 file_expr = 'filename="{}"'.format(filename) 

524 except UnicodeEncodeError: 

525 file_expr = "filename*=utf-8''{}".format(quote(filename)) 

526 self.headers["Content-Disposition"] = "{}; {}".format( 

527 disposition, file_expr 

528 ) 

529 elif self.as_attachment: 

530 self.headers["Content-Disposition"] = "attachment" 

531 

532 

533class HttpResponseRedirectBase(HttpResponse): 

534 allowed_schemes = ["http", "https", "ftp"] 

535 

536 def __init__(self, redirect_to, *args, **kwargs): 

537 super().__init__(*args, **kwargs) 

538 self["Location"] = iri_to_uri(redirect_to) 

539 parsed = urlparse(str(redirect_to)) 

540 if parsed.scheme and parsed.scheme not in self.allowed_schemes: 

541 raise DisallowedRedirect( 

542 "Unsafe redirect to URL with protocol '%s'" % parsed.scheme 

543 ) 

544 

545 url = property(lambda self: self["Location"]) 545 ↛ exitline 545 didn't run the lambda on line 545

546 

547 def __repr__(self): 

548 return ( 

549 '<%(cls)s status_code=%(status_code)d%(content_type)s, url="%(url)s">' 

550 % { 

551 "cls": self.__class__.__name__, 

552 "status_code": self.status_code, 

553 "content_type": self._content_type_for_repr, 

554 "url": self.url, 

555 } 

556 ) 

557 

558 

559class HttpResponseRedirect(HttpResponseRedirectBase): 

560 status_code = 302 

561 

562 

563class HttpResponsePermanentRedirect(HttpResponseRedirectBase): 

564 status_code = 301 

565 

566 

567class HttpResponseNotModified(HttpResponse): 

568 status_code = 304 

569 

570 def __init__(self, *args, **kwargs): 

571 super().__init__(*args, **kwargs) 

572 del self["content-type"] 

573 

574 @HttpResponse.content.setter 

575 def content(self, value): 

576 if value: 

577 raise AttributeError( 

578 "You cannot set content to a 304 (Not Modified) response" 

579 ) 

580 self._container = [] 

581 

582 

583class HttpResponseBadRequest(HttpResponse): 

584 status_code = 400 

585 

586 

587class HttpResponseNotFound(HttpResponse): 

588 status_code = 404 

589 

590 

591class HttpResponseForbidden(HttpResponse): 

592 status_code = 403 

593 

594 

595class HttpResponseNotAllowed(HttpResponse): 

596 status_code = 405 

597 

598 def __init__(self, permitted_methods, *args, **kwargs): 

599 super().__init__(*args, **kwargs) 

600 self["Allow"] = ", ".join(permitted_methods) 

601 

602 def __repr__(self): 

603 return "<%(cls)s [%(methods)s] status_code=%(status_code)d%(content_type)s>" % { 

604 "cls": self.__class__.__name__, 

605 "status_code": self.status_code, 

606 "content_type": self._content_type_for_repr, 

607 "methods": self["Allow"], 

608 } 

609 

610 

611class HttpResponseGone(HttpResponse): 

612 status_code = 410 

613 

614 

615class HttpResponseServerError(HttpResponse): 

616 status_code = 500 

617 

618 

619class Http404(Exception): 

620 pass 

621 

622 

623class JsonResponse(HttpResponse): 

624 """ 

625 An HTTP response class that consumes data to be serialized to JSON. 

626 

627 :param data: Data to be dumped into json. By default only ``dict`` objects 

628 are allowed to be passed due to a security flaw before ECMAScript 5. See 

629 the ``safe`` parameter for more information. 

630 :param encoder: Should be a json encoder class. Defaults to 

631 ``django.core.serializers.json.DjangoJSONEncoder``. 

632 :param safe: Controls if only ``dict`` objects may be serialized. Defaults 

633 to ``True``. 

634 :param json_dumps_params: A dictionary of kwargs passed to json.dumps(). 

635 """ 

636 

637 def __init__( 

638 self, 

639 data, 

640 encoder=DjangoJSONEncoder, 

641 safe=True, 

642 json_dumps_params=None, 

643 **kwargs, 

644 ): 

645 if safe and not isinstance(data, dict): 

646 raise TypeError( 

647 "In order to allow non-dict objects to be serialized set the " 

648 "safe parameter to False." 

649 ) 

650 if json_dumps_params is None: 

651 json_dumps_params = {} 

652 kwargs.setdefault("content_type", "application/json") 

653 data = json.dumps(data, cls=encoder, **json_dumps_params) 

654 super().__init__(content=data, **kwargs)