Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/click/_compat.py: 20%

314 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import codecs 

2import io 

3import os 

4import re 

5import sys 

6import typing as t 

7from weakref import WeakKeyDictionary 

8 

9CYGWIN = sys.platform.startswith("cygwin") 

10MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version) 

11# Determine local App Engine environment, per Google's own suggestion 

12APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get( 

13 "SERVER_SOFTWARE", "" 

14) 

15WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2 

16auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None 

17_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") 

18 

19 

20def get_filesystem_encoding() -> str: 

21 return sys.getfilesystemencoding() or sys.getdefaultencoding() 

22 

23 

24def _make_text_stream( 

25 stream: t.BinaryIO, 

26 encoding: t.Optional[str], 

27 errors: t.Optional[str], 

28 force_readable: bool = False, 

29 force_writable: bool = False, 

30) -> t.TextIO: 

31 if encoding is None: 

32 encoding = get_best_encoding(stream) 

33 if errors is None: 

34 errors = "replace" 

35 return _NonClosingTextIOWrapper( 

36 stream, 

37 encoding, 

38 errors, 

39 line_buffering=True, 

40 force_readable=force_readable, 

41 force_writable=force_writable, 

42 ) 

43 

44 

45def is_ascii_encoding(encoding: str) -> bool: 

46 """Checks if a given encoding is ascii.""" 

47 try: 

48 return codecs.lookup(encoding).name == "ascii" 

49 except LookupError: 

50 return False 

51 

52 

53def get_best_encoding(stream: t.IO) -> str: 

54 """Returns the default stream encoding if not found.""" 

55 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() 

56 if is_ascii_encoding(rv): 

57 return "utf-8" 

58 return rv 

59 

60 

61class _NonClosingTextIOWrapper(io.TextIOWrapper): 

62 def __init__( 

63 self, 

64 stream: t.BinaryIO, 

65 encoding: t.Optional[str], 

66 errors: t.Optional[str], 

67 force_readable: bool = False, 

68 force_writable: bool = False, 

69 **extra: t.Any, 

70 ) -> None: 

71 self._stream = stream = t.cast( 

72 t.BinaryIO, _FixupStream(stream, force_readable, force_writable) 

73 ) 

74 super().__init__(stream, encoding, errors, **extra) 

75 

76 def __del__(self) -> None: 

77 try: 

78 self.detach() 

79 except Exception: 

80 pass 

81 

82 def isatty(self) -> bool: 

83 # https://bitbucket.org/pypy/pypy/issue/1803 

84 return self._stream.isatty() 

85 

86 

87class _FixupStream: 

88 """The new io interface needs more from streams than streams 

89 traditionally implement. As such, this fix-up code is necessary in 

90 some circumstances. 

91 

92 The forcing of readable and writable flags are there because some tools 

93 put badly patched objects on sys (one such offender are certain version 

94 of jupyter notebook). 

95 """ 

96 

97 def __init__( 

98 self, 

99 stream: t.BinaryIO, 

100 force_readable: bool = False, 

101 force_writable: bool = False, 

102 ): 

103 self._stream = stream 

104 self._force_readable = force_readable 

105 self._force_writable = force_writable 

106 

107 def __getattr__(self, name: str) -> t.Any: 

108 return getattr(self._stream, name) 

109 

110 def read1(self, size: int) -> bytes: 

111 f = getattr(self._stream, "read1", None) 

112 

113 if f is not None: 

114 return t.cast(bytes, f(size)) 

115 

116 return self._stream.read(size) 

117 

118 def readable(self) -> bool: 

119 if self._force_readable: 

120 return True 

121 x = getattr(self._stream, "readable", None) 

122 if x is not None: 

123 return t.cast(bool, x()) 

124 try: 

125 self._stream.read(0) 

126 except Exception: 

127 return False 

128 return True 

129 

130 def writable(self) -> bool: 

131 if self._force_writable: 

132 return True 

133 x = getattr(self._stream, "writable", None) 

134 if x is not None: 

135 return t.cast(bool, x()) 

136 try: 

137 self._stream.write("") # type: ignore 

138 except Exception: 

139 try: 

140 self._stream.write(b"") 

141 except Exception: 

142 return False 

143 return True 

144 

145 def seekable(self) -> bool: 

146 x = getattr(self._stream, "seekable", None) 

147 if x is not None: 

148 return t.cast(bool, x()) 

149 try: 

150 self._stream.seek(self._stream.tell()) 

151 except Exception: 

152 return False 

153 return True 

154 

155 

156def _is_binary_reader(stream: t.IO, default: bool = False) -> bool: 

157 try: 

158 return isinstance(stream.read(0), bytes) 

159 except Exception: 

160 return default 

161 # This happens in some cases where the stream was already 

162 # closed. In this case, we assume the default. 

163 

164 

165def _is_binary_writer(stream: t.IO, default: bool = False) -> bool: 

166 try: 

167 stream.write(b"") 

168 except Exception: 

169 try: 

170 stream.write("") 

171 return False 

172 except Exception: 

173 pass 

174 return default 

175 return True 

176 

177 

178def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]: 

179 # We need to figure out if the given stream is already binary. 

180 # This can happen because the official docs recommend detaching 

181 # the streams to get binary streams. Some code might do this, so 

182 # we need to deal with this case explicitly. 

183 if _is_binary_reader(stream, False): 

184 return t.cast(t.BinaryIO, stream) 

185 

186 buf = getattr(stream, "buffer", None) 

187 

188 # Same situation here; this time we assume that the buffer is 

189 # actually binary in case it's closed. 

190 if buf is not None and _is_binary_reader(buf, True): 

191 return t.cast(t.BinaryIO, buf) 

192 

193 return None 

194 

195 

196def _find_binary_writer(stream: t.IO) -> t.Optional[t.BinaryIO]: 

197 # We need to figure out if the given stream is already binary. 

198 # This can happen because the official docs recommend detaching 

199 # the streams to get binary streams. Some code might do this, so 

200 # we need to deal with this case explicitly. 

201 if _is_binary_writer(stream, False): 

202 return t.cast(t.BinaryIO, stream) 

203 

204 buf = getattr(stream, "buffer", None) 

205 

206 # Same situation here; this time we assume that the buffer is 

207 # actually binary in case it's closed. 

208 if buf is not None and _is_binary_writer(buf, True): 

209 return t.cast(t.BinaryIO, buf) 

210 

211 return None 

212 

213 

214def _stream_is_misconfigured(stream: t.TextIO) -> bool: 

215 """A stream is misconfigured if its encoding is ASCII.""" 

216 # If the stream does not have an encoding set, we assume it's set 

217 # to ASCII. This appears to happen in certain unittest 

218 # environments. It's not quite clear what the correct behavior is 

219 # but this at least will force Click to recover somehow. 

220 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") 

221 

222 

223def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool: 

224 """A stream attribute is compatible if it is equal to the 

225 desired value or the desired value is unset and the attribute 

226 has a value. 

227 """ 

228 stream_value = getattr(stream, attr, None) 

229 return stream_value == value or (value is None and stream_value is not None) 

230 

231 

232def _is_compatible_text_stream( 

233 stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] 

234) -> bool: 

235 """Check if a stream's encoding and errors attributes are 

236 compatible with the desired values. 

237 """ 

238 return _is_compat_stream_attr( 

239 stream, "encoding", encoding 

240 ) and _is_compat_stream_attr(stream, "errors", errors) 

241 

242 

243def _force_correct_text_stream( 

244 text_stream: t.IO, 

245 encoding: t.Optional[str], 

246 errors: t.Optional[str], 

247 is_binary: t.Callable[[t.IO, bool], bool], 

248 find_binary: t.Callable[[t.IO], t.Optional[t.BinaryIO]], 

249 force_readable: bool = False, 

250 force_writable: bool = False, 

251) -> t.TextIO: 

252 if is_binary(text_stream, False): 

253 binary_reader = t.cast(t.BinaryIO, text_stream) 

254 else: 

255 text_stream = t.cast(t.TextIO, text_stream) 

256 # If the stream looks compatible, and won't default to a 

257 # misconfigured ascii encoding, return it as-is. 

258 if _is_compatible_text_stream(text_stream, encoding, errors) and not ( 

259 encoding is None and _stream_is_misconfigured(text_stream) 

260 ): 

261 return text_stream 

262 

263 # Otherwise, get the underlying binary reader. 

264 possible_binary_reader = find_binary(text_stream) 

265 

266 # If that's not possible, silently use the original reader 

267 # and get mojibake instead of exceptions. 

268 if possible_binary_reader is None: 

269 return text_stream 

270 

271 binary_reader = possible_binary_reader 

272 

273 # Default errors to replace instead of strict in order to get 

274 # something that works. 

275 if errors is None: 

276 errors = "replace" 

277 

278 # Wrap the binary stream in a text stream with the correct 

279 # encoding parameters. 

280 return _make_text_stream( 

281 binary_reader, 

282 encoding, 

283 errors, 

284 force_readable=force_readable, 

285 force_writable=force_writable, 

286 ) 

287 

288 

289def _force_correct_text_reader( 

290 text_reader: t.IO, 

291 encoding: t.Optional[str], 

292 errors: t.Optional[str], 

293 force_readable: bool = False, 

294) -> t.TextIO: 

295 return _force_correct_text_stream( 

296 text_reader, 

297 encoding, 

298 errors, 

299 _is_binary_reader, 

300 _find_binary_reader, 

301 force_readable=force_readable, 

302 ) 

303 

304 

305def _force_correct_text_writer( 

306 text_writer: t.IO, 

307 encoding: t.Optional[str], 

308 errors: t.Optional[str], 

309 force_writable: bool = False, 

310) -> t.TextIO: 

311 return _force_correct_text_stream( 

312 text_writer, 

313 encoding, 

314 errors, 

315 _is_binary_writer, 

316 _find_binary_writer, 

317 force_writable=force_writable, 

318 ) 

319 

320 

321def get_binary_stdin() -> t.BinaryIO: 

322 reader = _find_binary_reader(sys.stdin) 

323 if reader is None: 

324 raise RuntimeError("Was not able to determine binary stream for sys.stdin.") 

325 return reader 

326 

327 

328def get_binary_stdout() -> t.BinaryIO: 

329 writer = _find_binary_writer(sys.stdout) 

330 if writer is None: 

331 raise RuntimeError("Was not able to determine binary stream for sys.stdout.") 

332 return writer 

333 

334 

335def get_binary_stderr() -> t.BinaryIO: 

336 writer = _find_binary_writer(sys.stderr) 

337 if writer is None: 

338 raise RuntimeError("Was not able to determine binary stream for sys.stderr.") 

339 return writer 

340 

341 

342def get_text_stdin( 

343 encoding: t.Optional[str] = None, errors: t.Optional[str] = None 

344) -> t.TextIO: 

345 rv = _get_windows_console_stream(sys.stdin, encoding, errors) 

346 if rv is not None: 

347 return rv 

348 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True) 

349 

350 

351def get_text_stdout( 

352 encoding: t.Optional[str] = None, errors: t.Optional[str] = None 

353) -> t.TextIO: 

354 rv = _get_windows_console_stream(sys.stdout, encoding, errors) 

355 if rv is not None: 

356 return rv 

357 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True) 

358 

359 

360def get_text_stderr( 

361 encoding: t.Optional[str] = None, errors: t.Optional[str] = None 

362) -> t.TextIO: 

363 rv = _get_windows_console_stream(sys.stderr, encoding, errors) 

364 if rv is not None: 

365 return rv 

366 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True) 

367 

368 

369def _wrap_io_open( 

370 file: t.Union[str, os.PathLike, int], 

371 mode: str, 

372 encoding: t.Optional[str], 

373 errors: t.Optional[str], 

374) -> t.IO: 

375 """Handles not passing ``encoding`` and ``errors`` in binary mode.""" 

376 if "b" in mode: 

377 return open(file, mode) 

378 

379 return open(file, mode, encoding=encoding, errors=errors) 

380 

381 

382def open_stream( 

383 filename: str, 

384 mode: str = "r", 

385 encoding: t.Optional[str] = None, 

386 errors: t.Optional[str] = "strict", 

387 atomic: bool = False, 

388) -> t.Tuple[t.IO, bool]: 

389 binary = "b" in mode 

390 

391 # Standard streams first. These are simple because they ignore the 

392 # atomic flag. Use fsdecode to handle Path("-"). 

393 if os.fsdecode(filename) == "-": 

394 if any(m in mode for m in ["w", "a", "x"]): 

395 if binary: 

396 return get_binary_stdout(), False 

397 return get_text_stdout(encoding=encoding, errors=errors), False 

398 if binary: 

399 return get_binary_stdin(), False 

400 return get_text_stdin(encoding=encoding, errors=errors), False 

401 

402 # Non-atomic writes directly go out through the regular open functions. 

403 if not atomic: 

404 return _wrap_io_open(filename, mode, encoding, errors), True 

405 

406 # Some usability stuff for atomic writes 

407 if "a" in mode: 

408 raise ValueError( 

409 "Appending to an existing file is not supported, because that" 

410 " would involve an expensive `copy`-operation to a temporary" 

411 " file. Open the file in normal `w`-mode and copy explicitly" 

412 " if that's what you're after." 

413 ) 

414 if "x" in mode: 

415 raise ValueError("Use the `overwrite`-parameter instead.") 

416 if "w" not in mode: 

417 raise ValueError("Atomic writes only make sense with `w`-mode.") 

418 

419 # Atomic writes are more complicated. They work by opening a file 

420 # as a proxy in the same folder and then using the fdopen 

421 # functionality to wrap it in a Python file. Then we wrap it in an 

422 # atomic file that moves the file over on close. 

423 import errno 

424 import random 

425 

426 try: 

427 perm: t.Optional[int] = os.stat(filename).st_mode 

428 except OSError: 

429 perm = None 

430 

431 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL 

432 

433 if binary: 

434 flags |= getattr(os, "O_BINARY", 0) 

435 

436 while True: 

437 tmp_filename = os.path.join( 

438 os.path.dirname(filename), 

439 f".__atomic-write{random.randrange(1 << 32):08x}", 

440 ) 

441 try: 

442 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) 

443 break 

444 except OSError as e: 

445 if e.errno == errno.EEXIST or ( 

446 os.name == "nt" 

447 and e.errno == errno.EACCES 

448 and os.path.isdir(e.filename) 

449 and os.access(e.filename, os.W_OK) 

450 ): 

451 continue 

452 raise 

453 

454 if perm is not None: 

455 os.chmod(tmp_filename, perm) # in case perm includes bits in umask 

456 

457 f = _wrap_io_open(fd, mode, encoding, errors) 

458 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) 

459 return t.cast(t.IO, af), True 

460 

461 

462class _AtomicFile: 

463 def __init__(self, f: t.IO, tmp_filename: str, real_filename: str) -> None: 

464 self._f = f 

465 self._tmp_filename = tmp_filename 

466 self._real_filename = real_filename 

467 self.closed = False 

468 

469 @property 

470 def name(self) -> str: 

471 return self._real_filename 

472 

473 def close(self, delete: bool = False) -> None: 

474 if self.closed: 

475 return 

476 self._f.close() 

477 os.replace(self._tmp_filename, self._real_filename) 

478 self.closed = True 

479 

480 def __getattr__(self, name: str) -> t.Any: 

481 return getattr(self._f, name) 

482 

483 def __enter__(self) -> "_AtomicFile": 

484 return self 

485 

486 def __exit__(self, exc_type, exc_value, tb): # type: ignore 

487 self.close(delete=exc_type is not None) 

488 

489 def __repr__(self) -> str: 

490 return repr(self._f) 

491 

492 

493def strip_ansi(value: str) -> str: 

494 return _ansi_re.sub("", value) 

495 

496 

497def _is_jupyter_kernel_output(stream: t.IO) -> bool: 

498 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): 

499 stream = stream._stream 

500 

501 return stream.__class__.__module__.startswith("ipykernel.") 

502 

503 

504def should_strip_ansi( 

505 stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None 

506) -> bool: 

507 if color is None: 

508 if stream is None: 

509 stream = sys.stdin 

510 return not isatty(stream) and not _is_jupyter_kernel_output(stream) 

511 return not color 

512 

513 

514# On Windows, wrap the output streams with colorama to support ANSI 

515# color codes. 

516# NOTE: double check is needed so mypy does not analyze this on Linux 

517if sys.platform.startswith("win") and WIN: 517 ↛ 518line 517 didn't jump to line 518, because the condition on line 517 was never true

518 from ._winconsole import _get_windows_console_stream 

519 

520 def _get_argv_encoding() -> str: 

521 import locale 

522 

523 return locale.getpreferredencoding() 

524 

525 _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

526 

527 def auto_wrap_for_ansi( 

528 stream: t.TextIO, color: t.Optional[bool] = None 

529 ) -> t.TextIO: 

530 """Support ANSI color and style codes on Windows by wrapping a 

531 stream with colorama. 

532 """ 

533 try: 

534 cached = _ansi_stream_wrappers.get(stream) 

535 except Exception: 

536 cached = None 

537 

538 if cached is not None: 

539 return cached 

540 

541 import colorama 

542 

543 strip = should_strip_ansi(stream, color) 

544 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) 

545 rv = t.cast(t.TextIO, ansi_wrapper.stream) 

546 _write = rv.write 

547 

548 def _safe_write(s): 

549 try: 

550 return _write(s) 

551 except BaseException: 

552 ansi_wrapper.reset_all() 

553 raise 

554 

555 rv.write = _safe_write 

556 

557 try: 

558 _ansi_stream_wrappers[stream] = rv 

559 except Exception: 

560 pass 

561 

562 return rv 

563 

564else: 

565 

566 def _get_argv_encoding() -> str: 

567 return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding() 

568 

569 def _get_windows_console_stream( 

570 f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] 

571 ) -> t.Optional[t.TextIO]: 

572 return None 

573 

574 

575def term_len(x: str) -> int: 

576 return len(strip_ansi(x)) 

577 

578 

579def isatty(stream: t.IO) -> bool: 

580 try: 

581 return stream.isatty() 

582 except Exception: 

583 return False 

584 

585 

586def _make_cached_stream_func( 

587 src_func: t.Callable[[], t.TextIO], wrapper_func: t.Callable[[], t.TextIO] 

588) -> t.Callable[[], t.TextIO]: 

589 cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() 

590 

591 def func() -> t.TextIO: 

592 stream = src_func() 

593 try: 

594 rv = cache.get(stream) 

595 except Exception: 

596 rv = None 

597 if rv is not None: 

598 return rv 

599 rv = wrapper_func() 

600 try: 

601 cache[stream] = rv 

602 except Exception: 

603 pass 

604 return rv 

605 

606 return func 

607 

608 

609_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) 609 ↛ exitline 609 didn't run the lambda on line 609

610_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) 610 ↛ exitline 610 didn't run the lambda on line 610

611_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) 611 ↛ exitline 611 didn't run the lambda on line 611

612 

613 

614binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = { 

615 "stdin": get_binary_stdin, 

616 "stdout": get_binary_stdout, 

617 "stderr": get_binary_stderr, 

618} 

619 

620text_streams: t.Mapping[ 

621 str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO] 

622] = { 

623 "stdin": get_text_stdin, 

624 "stdout": get_text_stdout, 

625 "stderr": get_text_stderr, 

626}