Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/click/_compat.py: 20%
314 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import codecs
2import io
3import os
4import re
5import sys
6import typing as t
7from weakref import WeakKeyDictionary
9CYGWIN = sys.platform.startswith("cygwin")
10MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version)
11# Determine local App Engine environment, per Google's own suggestion
12APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get(
13 "SERVER_SOFTWARE", ""
14)
15WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2
16auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
17_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
20def get_filesystem_encoding() -> str:
21 return sys.getfilesystemencoding() or sys.getdefaultencoding()
24def _make_text_stream(
25 stream: t.BinaryIO,
26 encoding: t.Optional[str],
27 errors: t.Optional[str],
28 force_readable: bool = False,
29 force_writable: bool = False,
30) -> t.TextIO:
31 if encoding is None:
32 encoding = get_best_encoding(stream)
33 if errors is None:
34 errors = "replace"
35 return _NonClosingTextIOWrapper(
36 stream,
37 encoding,
38 errors,
39 line_buffering=True,
40 force_readable=force_readable,
41 force_writable=force_writable,
42 )
45def is_ascii_encoding(encoding: str) -> bool:
46 """Checks if a given encoding is ascii."""
47 try:
48 return codecs.lookup(encoding).name == "ascii"
49 except LookupError:
50 return False
53def get_best_encoding(stream: t.IO) -> str:
54 """Returns the default stream encoding if not found."""
55 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
56 if is_ascii_encoding(rv):
57 return "utf-8"
58 return rv
61class _NonClosingTextIOWrapper(io.TextIOWrapper):
62 def __init__(
63 self,
64 stream: t.BinaryIO,
65 encoding: t.Optional[str],
66 errors: t.Optional[str],
67 force_readable: bool = False,
68 force_writable: bool = False,
69 **extra: t.Any,
70 ) -> None:
71 self._stream = stream = t.cast(
72 t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
73 )
74 super().__init__(stream, encoding, errors, **extra)
76 def __del__(self) -> None:
77 try:
78 self.detach()
79 except Exception:
80 pass
82 def isatty(self) -> bool:
83 # https://bitbucket.org/pypy/pypy/issue/1803
84 return self._stream.isatty()
87class _FixupStream:
88 """The new io interface needs more from streams than streams
89 traditionally implement. As such, this fix-up code is necessary in
90 some circumstances.
92 The forcing of readable and writable flags are there because some tools
93 put badly patched objects on sys (one such offender are certain version
94 of jupyter notebook).
95 """
97 def __init__(
98 self,
99 stream: t.BinaryIO,
100 force_readable: bool = False,
101 force_writable: bool = False,
102 ):
103 self._stream = stream
104 self._force_readable = force_readable
105 self._force_writable = force_writable
107 def __getattr__(self, name: str) -> t.Any:
108 return getattr(self._stream, name)
110 def read1(self, size: int) -> bytes:
111 f = getattr(self._stream, "read1", None)
113 if f is not None:
114 return t.cast(bytes, f(size))
116 return self._stream.read(size)
118 def readable(self) -> bool:
119 if self._force_readable:
120 return True
121 x = getattr(self._stream, "readable", None)
122 if x is not None:
123 return t.cast(bool, x())
124 try:
125 self._stream.read(0)
126 except Exception:
127 return False
128 return True
130 def writable(self) -> bool:
131 if self._force_writable:
132 return True
133 x = getattr(self._stream, "writable", None)
134 if x is not None:
135 return t.cast(bool, x())
136 try:
137 self._stream.write("") # type: ignore
138 except Exception:
139 try:
140 self._stream.write(b"")
141 except Exception:
142 return False
143 return True
145 def seekable(self) -> bool:
146 x = getattr(self._stream, "seekable", None)
147 if x is not None:
148 return t.cast(bool, x())
149 try:
150 self._stream.seek(self._stream.tell())
151 except Exception:
152 return False
153 return True
156def _is_binary_reader(stream: t.IO, default: bool = False) -> bool:
157 try:
158 return isinstance(stream.read(0), bytes)
159 except Exception:
160 return default
161 # This happens in some cases where the stream was already
162 # closed. In this case, we assume the default.
165def _is_binary_writer(stream: t.IO, default: bool = False) -> bool:
166 try:
167 stream.write(b"")
168 except Exception:
169 try:
170 stream.write("")
171 return False
172 except Exception:
173 pass
174 return default
175 return True
178def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]:
179 # We need to figure out if the given stream is already binary.
180 # This can happen because the official docs recommend detaching
181 # the streams to get binary streams. Some code might do this, so
182 # we need to deal with this case explicitly.
183 if _is_binary_reader(stream, False):
184 return t.cast(t.BinaryIO, stream)
186 buf = getattr(stream, "buffer", None)
188 # Same situation here; this time we assume that the buffer is
189 # actually binary in case it's closed.
190 if buf is not None and _is_binary_reader(buf, True):
191 return t.cast(t.BinaryIO, buf)
193 return None
196def _find_binary_writer(stream: t.IO) -> t.Optional[t.BinaryIO]:
197 # We need to figure out if the given stream is already binary.
198 # This can happen because the official docs recommend detaching
199 # the streams to get binary streams. Some code might do this, so
200 # we need to deal with this case explicitly.
201 if _is_binary_writer(stream, False):
202 return t.cast(t.BinaryIO, stream)
204 buf = getattr(stream, "buffer", None)
206 # Same situation here; this time we assume that the buffer is
207 # actually binary in case it's closed.
208 if buf is not None and _is_binary_writer(buf, True):
209 return t.cast(t.BinaryIO, buf)
211 return None
214def _stream_is_misconfigured(stream: t.TextIO) -> bool:
215 """A stream is misconfigured if its encoding is ASCII."""
216 # If the stream does not have an encoding set, we assume it's set
217 # to ASCII. This appears to happen in certain unittest
218 # environments. It's not quite clear what the correct behavior is
219 # but this at least will force Click to recover somehow.
220 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
223def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:
224 """A stream attribute is compatible if it is equal to the
225 desired value or the desired value is unset and the attribute
226 has a value.
227 """
228 stream_value = getattr(stream, attr, None)
229 return stream_value == value or (value is None and stream_value is not None)
232def _is_compatible_text_stream(
233 stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
234) -> bool:
235 """Check if a stream's encoding and errors attributes are
236 compatible with the desired values.
237 """
238 return _is_compat_stream_attr(
239 stream, "encoding", encoding
240 ) and _is_compat_stream_attr(stream, "errors", errors)
243def _force_correct_text_stream(
244 text_stream: t.IO,
245 encoding: t.Optional[str],
246 errors: t.Optional[str],
247 is_binary: t.Callable[[t.IO, bool], bool],
248 find_binary: t.Callable[[t.IO], t.Optional[t.BinaryIO]],
249 force_readable: bool = False,
250 force_writable: bool = False,
251) -> t.TextIO:
252 if is_binary(text_stream, False):
253 binary_reader = t.cast(t.BinaryIO, text_stream)
254 else:
255 text_stream = t.cast(t.TextIO, text_stream)
256 # If the stream looks compatible, and won't default to a
257 # misconfigured ascii encoding, return it as-is.
258 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
259 encoding is None and _stream_is_misconfigured(text_stream)
260 ):
261 return text_stream
263 # Otherwise, get the underlying binary reader.
264 possible_binary_reader = find_binary(text_stream)
266 # If that's not possible, silently use the original reader
267 # and get mojibake instead of exceptions.
268 if possible_binary_reader is None:
269 return text_stream
271 binary_reader = possible_binary_reader
273 # Default errors to replace instead of strict in order to get
274 # something that works.
275 if errors is None:
276 errors = "replace"
278 # Wrap the binary stream in a text stream with the correct
279 # encoding parameters.
280 return _make_text_stream(
281 binary_reader,
282 encoding,
283 errors,
284 force_readable=force_readable,
285 force_writable=force_writable,
286 )
289def _force_correct_text_reader(
290 text_reader: t.IO,
291 encoding: t.Optional[str],
292 errors: t.Optional[str],
293 force_readable: bool = False,
294) -> t.TextIO:
295 return _force_correct_text_stream(
296 text_reader,
297 encoding,
298 errors,
299 _is_binary_reader,
300 _find_binary_reader,
301 force_readable=force_readable,
302 )
305def _force_correct_text_writer(
306 text_writer: t.IO,
307 encoding: t.Optional[str],
308 errors: t.Optional[str],
309 force_writable: bool = False,
310) -> t.TextIO:
311 return _force_correct_text_stream(
312 text_writer,
313 encoding,
314 errors,
315 _is_binary_writer,
316 _find_binary_writer,
317 force_writable=force_writable,
318 )
321def get_binary_stdin() -> t.BinaryIO:
322 reader = _find_binary_reader(sys.stdin)
323 if reader is None:
324 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
325 return reader
328def get_binary_stdout() -> t.BinaryIO:
329 writer = _find_binary_writer(sys.stdout)
330 if writer is None:
331 raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
332 return writer
335def get_binary_stderr() -> t.BinaryIO:
336 writer = _find_binary_writer(sys.stderr)
337 if writer is None:
338 raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
339 return writer
342def get_text_stdin(
343 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
344) -> t.TextIO:
345 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
346 if rv is not None:
347 return rv
348 return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
351def get_text_stdout(
352 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
353) -> t.TextIO:
354 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
355 if rv is not None:
356 return rv
357 return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
360def get_text_stderr(
361 encoding: t.Optional[str] = None, errors: t.Optional[str] = None
362) -> t.TextIO:
363 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
364 if rv is not None:
365 return rv
366 return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
369def _wrap_io_open(
370 file: t.Union[str, os.PathLike, int],
371 mode: str,
372 encoding: t.Optional[str],
373 errors: t.Optional[str],
374) -> t.IO:
375 """Handles not passing ``encoding`` and ``errors`` in binary mode."""
376 if "b" in mode:
377 return open(file, mode)
379 return open(file, mode, encoding=encoding, errors=errors)
382def open_stream(
383 filename: str,
384 mode: str = "r",
385 encoding: t.Optional[str] = None,
386 errors: t.Optional[str] = "strict",
387 atomic: bool = False,
388) -> t.Tuple[t.IO, bool]:
389 binary = "b" in mode
391 # Standard streams first. These are simple because they ignore the
392 # atomic flag. Use fsdecode to handle Path("-").
393 if os.fsdecode(filename) == "-":
394 if any(m in mode for m in ["w", "a", "x"]):
395 if binary:
396 return get_binary_stdout(), False
397 return get_text_stdout(encoding=encoding, errors=errors), False
398 if binary:
399 return get_binary_stdin(), False
400 return get_text_stdin(encoding=encoding, errors=errors), False
402 # Non-atomic writes directly go out through the regular open functions.
403 if not atomic:
404 return _wrap_io_open(filename, mode, encoding, errors), True
406 # Some usability stuff for atomic writes
407 if "a" in mode:
408 raise ValueError(
409 "Appending to an existing file is not supported, because that"
410 " would involve an expensive `copy`-operation to a temporary"
411 " file. Open the file in normal `w`-mode and copy explicitly"
412 " if that's what you're after."
413 )
414 if "x" in mode:
415 raise ValueError("Use the `overwrite`-parameter instead.")
416 if "w" not in mode:
417 raise ValueError("Atomic writes only make sense with `w`-mode.")
419 # Atomic writes are more complicated. They work by opening a file
420 # as a proxy in the same folder and then using the fdopen
421 # functionality to wrap it in a Python file. Then we wrap it in an
422 # atomic file that moves the file over on close.
423 import errno
424 import random
426 try:
427 perm: t.Optional[int] = os.stat(filename).st_mode
428 except OSError:
429 perm = None
431 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
433 if binary:
434 flags |= getattr(os, "O_BINARY", 0)
436 while True:
437 tmp_filename = os.path.join(
438 os.path.dirname(filename),
439 f".__atomic-write{random.randrange(1 << 32):08x}",
440 )
441 try:
442 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
443 break
444 except OSError as e:
445 if e.errno == errno.EEXIST or (
446 os.name == "nt"
447 and e.errno == errno.EACCES
448 and os.path.isdir(e.filename)
449 and os.access(e.filename, os.W_OK)
450 ):
451 continue
452 raise
454 if perm is not None:
455 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
457 f = _wrap_io_open(fd, mode, encoding, errors)
458 af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
459 return t.cast(t.IO, af), True
462class _AtomicFile:
463 def __init__(self, f: t.IO, tmp_filename: str, real_filename: str) -> None:
464 self._f = f
465 self._tmp_filename = tmp_filename
466 self._real_filename = real_filename
467 self.closed = False
469 @property
470 def name(self) -> str:
471 return self._real_filename
473 def close(self, delete: bool = False) -> None:
474 if self.closed:
475 return
476 self._f.close()
477 os.replace(self._tmp_filename, self._real_filename)
478 self.closed = True
480 def __getattr__(self, name: str) -> t.Any:
481 return getattr(self._f, name)
483 def __enter__(self) -> "_AtomicFile":
484 return self
486 def __exit__(self, exc_type, exc_value, tb): # type: ignore
487 self.close(delete=exc_type is not None)
489 def __repr__(self) -> str:
490 return repr(self._f)
493def strip_ansi(value: str) -> str:
494 return _ansi_re.sub("", value)
497def _is_jupyter_kernel_output(stream: t.IO) -> bool:
498 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
499 stream = stream._stream
501 return stream.__class__.__module__.startswith("ipykernel.")
504def should_strip_ansi(
505 stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None
506) -> bool:
507 if color is None:
508 if stream is None:
509 stream = sys.stdin
510 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
511 return not color
514# On Windows, wrap the output streams with colorama to support ANSI
515# color codes.
516# NOTE: double check is needed so mypy does not analyze this on Linux
517if sys.platform.startswith("win") and WIN: 517 ↛ 518line 517 didn't jump to line 518, because the condition on line 517 was never true
518 from ._winconsole import _get_windows_console_stream
520 def _get_argv_encoding() -> str:
521 import locale
523 return locale.getpreferredencoding()
525 _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
527 def auto_wrap_for_ansi(
528 stream: t.TextIO, color: t.Optional[bool] = None
529 ) -> t.TextIO:
530 """Support ANSI color and style codes on Windows by wrapping a
531 stream with colorama.
532 """
533 try:
534 cached = _ansi_stream_wrappers.get(stream)
535 except Exception:
536 cached = None
538 if cached is not None:
539 return cached
541 import colorama
543 strip = should_strip_ansi(stream, color)
544 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
545 rv = t.cast(t.TextIO, ansi_wrapper.stream)
546 _write = rv.write
548 def _safe_write(s):
549 try:
550 return _write(s)
551 except BaseException:
552 ansi_wrapper.reset_all()
553 raise
555 rv.write = _safe_write
557 try:
558 _ansi_stream_wrappers[stream] = rv
559 except Exception:
560 pass
562 return rv
564else:
566 def _get_argv_encoding() -> str:
567 return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
569 def _get_windows_console_stream(
570 f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
571 ) -> t.Optional[t.TextIO]:
572 return None
575def term_len(x: str) -> int:
576 return len(strip_ansi(x))
579def isatty(stream: t.IO) -> bool:
580 try:
581 return stream.isatty()
582 except Exception:
583 return False
586def _make_cached_stream_func(
587 src_func: t.Callable[[], t.TextIO], wrapper_func: t.Callable[[], t.TextIO]
588) -> t.Callable[[], t.TextIO]:
589 cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
591 def func() -> t.TextIO:
592 stream = src_func()
593 try:
594 rv = cache.get(stream)
595 except Exception:
596 rv = None
597 if rv is not None:
598 return rv
599 rv = wrapper_func()
600 try:
601 cache[stream] = rv
602 except Exception:
603 pass
604 return rv
606 return func
609_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) 609 ↛ exitline 609 didn't run the lambda on line 609
610_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) 610 ↛ exitline 610 didn't run the lambda on line 610
611_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) 611 ↛ exitline 611 didn't run the lambda on line 611
614binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {
615 "stdin": get_binary_stdin,
616 "stdout": get_binary_stdout,
617 "stderr": get_binary_stderr,
618}
620text_streams: t.Mapping[
621 str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]
622] = {
623 "stdin": get_text_stdin,
624 "stdout": get_text_stdout,
625 "stderr": get_text_stderr,
626}