Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/common.py: 19%
422 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""Common IO api utilities"""
2from __future__ import annotations
4from abc import (
5 ABC,
6 abstractmethod,
7)
8import bz2
9import codecs
10import dataclasses
11import functools
12import gzip
13from io import (
14 BufferedIOBase,
15 BytesIO,
16 RawIOBase,
17 StringIO,
18 TextIOBase,
19 TextIOWrapper,
20)
21import mmap
22import os
23from pathlib import Path
24import re
25import tarfile
26from typing import (
27 IO,
28 Any,
29 AnyStr,
30 Generic,
31 Literal,
32 Mapping,
33 Sequence,
34 TypeVar,
35 cast,
36 overload,
37)
38from urllib.parse import (
39 urljoin,
40 urlparse as parse_url,
41 uses_netloc,
42 uses_params,
43 uses_relative,
44)
45import warnings
46import zipfile
48from pandas._typing import (
49 BaseBuffer,
50 CompressionDict,
51 CompressionOptions,
52 FilePath,
53 ReadBuffer,
54 StorageOptions,
55 WriteBuffer,
56)
57from pandas.compat import get_lzma_file
58from pandas.compat._optional import import_optional_dependency
59from pandas.util._decorators import doc
60from pandas.util._exceptions import find_stack_level
62from pandas.core.dtypes.common import (
63 is_bool,
64 is_file_like,
65 is_integer,
66 is_list_like,
67)
69from pandas.core.shared_docs import _shared_docs
71_VALID_URLS = set(uses_relative + uses_netloc + uses_params)
72_VALID_URLS.discard("")
73_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://")
75BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer)
78@dataclasses.dataclass
79class IOArgs:
80 """
81 Return value of io/common.py:_get_filepath_or_buffer.
82 """
84 filepath_or_buffer: str | BaseBuffer
85 encoding: str
86 mode: str
87 compression: CompressionDict
88 should_close: bool = False
91@dataclasses.dataclass
92class IOHandles(Generic[AnyStr]):
93 """
94 Return value of io/common.py:get_handle
96 Can be used as a context manager.
98 This is used to easily close created buffers and to handle corner cases when
99 TextIOWrapper is inserted.
101 handle: The file handle to be used.
102 created_handles: All file handles that are created by get_handle
103 is_wrapped: Whether a TextIOWrapper needs to be detached.
104 """
106 # handle might not implement the IO-interface
107 handle: IO[AnyStr]
108 compression: CompressionDict
109 created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list)
110 is_wrapped: bool = False
112 def close(self) -> None:
113 """
114 Close all created buffers.
116 Note: If a TextIOWrapper was inserted, it is flushed and detached to
117 avoid closing the potentially user-created buffer.
118 """
119 if self.is_wrapped:
120 assert isinstance(self.handle, TextIOWrapper)
121 self.handle.flush()
122 self.handle.detach()
123 self.created_handles.remove(self.handle)
124 for handle in self.created_handles:
125 handle.close()
126 self.created_handles = []
127 self.is_wrapped = False
129 def __enter__(self) -> IOHandles[AnyStr]:
130 return self
132 def __exit__(self, *args: Any) -> None:
133 self.close()
136def is_url(url: object) -> bool:
137 """
138 Check to see if a URL has a valid protocol.
140 Parameters
141 ----------
142 url : str or unicode
144 Returns
145 -------
146 isurl : bool
147 If `url` has a valid protocol return True otherwise False.
148 """
149 if not isinstance(url, str):
150 return False
151 return parse_url(url).scheme in _VALID_URLS
154@overload
155def _expand_user(filepath_or_buffer: str) -> str:
156 ...
159@overload
160def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT:
161 ...
164def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT:
165 """
166 Return the argument with an initial component of ~ or ~user
167 replaced by that user's home directory.
169 Parameters
170 ----------
171 filepath_or_buffer : object to be converted if possible
173 Returns
174 -------
175 expanded_filepath_or_buffer : an expanded filepath or the
176 input if not expandable
177 """
178 if isinstance(filepath_or_buffer, str):
179 return os.path.expanduser(filepath_or_buffer)
180 return filepath_or_buffer
183def validate_header_arg(header: object) -> None:
184 if header is None:
185 return
186 if is_integer(header):
187 header = cast(int, header)
188 if header < 0:
189 # GH 27779
190 raise ValueError(
191 "Passing negative integer to header is invalid. "
192 "For no header, use header=None instead"
193 )
194 return
195 if is_list_like(header, allow_sets=False):
196 header = cast(Sequence, header)
197 if not all(map(is_integer, header)):
198 raise ValueError("header must be integer or list of integers")
199 if any(i < 0 for i in header):
200 raise ValueError("cannot specify multi-index header with negative integers")
201 return
202 if is_bool(header):
203 raise TypeError(
204 "Passing a bool to header is invalid. Use header=None for no header or "
205 "header=int or list-like of ints to specify "
206 "the row(s) making up the column names"
207 )
208 # GH 16338
209 raise ValueError("header must be integer or list of integers")
212@overload
213def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str:
214 ...
217@overload
218def stringify_path(
219 filepath_or_buffer: BaseBufferT, convert_file_like: bool = ...
220) -> BaseBufferT:
221 ...
224def stringify_path(
225 filepath_or_buffer: FilePath | BaseBufferT,
226 convert_file_like: bool = False,
227) -> str | BaseBufferT:
228 """
229 Attempt to convert a path-like object to a string.
231 Parameters
232 ----------
233 filepath_or_buffer : object to be converted
235 Returns
236 -------
237 str_filepath_or_buffer : maybe a string version of the object
239 Notes
240 -----
241 Objects supporting the fspath protocol (python 3.6+) are coerced
242 according to its __fspath__ method.
244 Any other object is passed through unchanged, which includes bytes,
245 strings, buffers, or anything else that's not even path-like.
246 """
247 if not convert_file_like and is_file_like(filepath_or_buffer):
248 # GH 38125: some fsspec objects implement os.PathLike but have already opened a
249 # file. This prevents opening the file a second time. infer_compression calls
250 # this function with convert_file_like=True to infer the compression.
251 return cast(BaseBufferT, filepath_or_buffer)
253 if isinstance(filepath_or_buffer, os.PathLike):
254 filepath_or_buffer = filepath_or_buffer.__fspath__()
255 return _expand_user(filepath_or_buffer)
258def urlopen(*args, **kwargs):
259 """
260 Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of
261 the stdlib.
262 """
263 import urllib.request
265 return urllib.request.urlopen(*args, **kwargs)
268def is_fsspec_url(url: FilePath | BaseBuffer) -> bool:
269 """
270 Returns true if the given URL looks like
271 something fsspec can handle
272 """
273 return (
274 isinstance(url, str)
275 and bool(_RFC_3986_PATTERN.match(url))
276 and not url.startswith(("http://", "https://"))
277 )
280@doc(
281 storage_options=_shared_docs["storage_options"],
282 compression_options=_shared_docs["compression_options"] % "filepath_or_buffer",
283)
284def _get_filepath_or_buffer(
285 filepath_or_buffer: FilePath | BaseBuffer,
286 encoding: str = "utf-8",
287 compression: CompressionOptions = None,
288 mode: str = "r",
289 storage_options: StorageOptions = None,
290) -> IOArgs:
291 """
292 If the filepath_or_buffer is a url, translate and return the buffer.
293 Otherwise passthrough.
295 Parameters
296 ----------
297 filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path),
298 or buffer
299 {compression_options}
301 .. versionchanged:: 1.4.0 Zstandard support.
303 encoding : the encoding to use to decode bytes, default is 'utf-8'
304 mode : str, optional
306 {storage_options}
308 .. versionadded:: 1.2.0
310 ..versionchange:: 1.2.0
312 Returns the dataclass IOArgs.
313 """
314 filepath_or_buffer = stringify_path(filepath_or_buffer)
316 # handle compression dict
317 compression_method, compression = get_compression_method(compression)
318 compression_method = infer_compression(filepath_or_buffer, compression_method)
320 # GH21227 internal compression is not used for non-binary handles.
321 if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode:
322 warnings.warn(
323 "compression has no effect when passing a non-binary object as input.",
324 RuntimeWarning,
325 stacklevel=find_stack_level(),
326 )
327 compression_method = None
329 compression = dict(compression, method=compression_method)
331 # bz2 and xz do not write the byte order mark for utf-16 and utf-32
332 # print a warning when writing such files
333 if (
334 "w" in mode
335 and compression_method in ["bz2", "xz"]
336 and encoding in ["utf-16", "utf-32"]
337 ):
338 warnings.warn(
339 f"{compression} will not write the byte order mark for {encoding}",
340 UnicodeWarning,
341 stacklevel=find_stack_level(),
342 )
344 # Use binary mode when converting path-like objects to file-like objects (fsspec)
345 # except when text mode is explicitly requested. The original mode is returned if
346 # fsspec is not used.
347 fsspec_mode = mode
348 if "t" not in fsspec_mode and "b" not in fsspec_mode:
349 fsspec_mode += "b"
351 if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
352 # TODO: fsspec can also handle HTTP via requests, but leaving this
353 # unchanged. using fsspec appears to break the ability to infer if the
354 # server responded with gzipped data
355 storage_options = storage_options or {}
357 # waiting until now for importing to match intended lazy logic of
358 # urlopen function defined elsewhere in this module
359 import urllib.request
361 # assuming storage_options is to be interpreted as headers
362 req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options)
363 with urlopen(req_info) as req:
364 content_encoding = req.headers.get("Content-Encoding", None)
365 if content_encoding == "gzip":
366 # Override compression based on Content-Encoding header
367 compression = {"method": "gzip"}
368 reader = BytesIO(req.read())
369 return IOArgs(
370 filepath_or_buffer=reader,
371 encoding=encoding,
372 compression=compression,
373 should_close=True,
374 mode=fsspec_mode,
375 )
377 if is_fsspec_url(filepath_or_buffer):
378 assert isinstance(
379 filepath_or_buffer, str
380 ) # just to appease mypy for this branch
381 # two special-case s3-like protocols; these have special meaning in Hadoop,
382 # but are equivalent to just "s3" from fsspec's point of view
383 # cc #11071
384 if filepath_or_buffer.startswith("s3a://"):
385 filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://")
386 if filepath_or_buffer.startswith("s3n://"):
387 filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://")
388 fsspec = import_optional_dependency("fsspec")
390 # If botocore is installed we fallback to reading with anon=True
391 # to allow reads from public buckets
392 err_types_to_retry_with_anon: list[Any] = []
393 try:
394 import_optional_dependency("botocore")
395 from botocore.exceptions import (
396 ClientError,
397 NoCredentialsError,
398 )
400 err_types_to_retry_with_anon = [
401 ClientError,
402 NoCredentialsError,
403 PermissionError,
404 ]
405 except ImportError:
406 pass
408 try:
409 file_obj = fsspec.open(
410 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
411 ).open()
412 # GH 34626 Reads from Public Buckets without Credentials needs anon=True
413 except tuple(err_types_to_retry_with_anon):
414 if storage_options is None:
415 storage_options = {"anon": True}
416 else:
417 # don't mutate user input.
418 storage_options = dict(storage_options)
419 storage_options["anon"] = True
420 file_obj = fsspec.open(
421 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
422 ).open()
424 return IOArgs(
425 filepath_or_buffer=file_obj,
426 encoding=encoding,
427 compression=compression,
428 should_close=True,
429 mode=fsspec_mode,
430 )
431 elif storage_options:
432 raise ValueError(
433 "storage_options passed with file object or non-fsspec file path"
434 )
436 if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
437 return IOArgs(
438 filepath_or_buffer=_expand_user(filepath_or_buffer),
439 encoding=encoding,
440 compression=compression,
441 should_close=False,
442 mode=mode,
443 )
445 # is_file_like requires (read | write) & __iter__ but __iter__ is only
446 # needed for read_csv(engine=python)
447 if not (
448 hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write")
449 ):
450 msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
451 raise ValueError(msg)
453 return IOArgs(
454 filepath_or_buffer=filepath_or_buffer,
455 encoding=encoding,
456 compression=compression,
457 should_close=False,
458 mode=mode,
459 )
462def file_path_to_url(path: str) -> str:
463 """
464 converts an absolute native path to a FILE URL.
466 Parameters
467 ----------
468 path : a path in native format
470 Returns
471 -------
472 a valid FILE URL
473 """
474 # lazify expensive import (~30ms)
475 from urllib.request import pathname2url
477 return urljoin("file:", pathname2url(path))
480_extension_to_compression = {
481 ".tar": "tar",
482 ".tar.gz": "tar",
483 ".tar.bz2": "tar",
484 ".tar.xz": "tar",
485 ".gz": "gzip",
486 ".bz2": "bz2",
487 ".zip": "zip",
488 ".xz": "xz",
489 ".zst": "zstd",
490}
491_supported_compressions = set(_extension_to_compression.values())
494def get_compression_method(
495 compression: CompressionOptions,
496) -> tuple[str | None, CompressionDict]:
497 """
498 Simplifies a compression argument to a compression method string and
499 a mapping containing additional arguments.
501 Parameters
502 ----------
503 compression : str or mapping
504 If string, specifies the compression method. If mapping, value at key
505 'method' specifies compression method.
507 Returns
508 -------
509 tuple of ({compression method}, Optional[str]
510 {compression arguments}, Dict[str, Any])
512 Raises
513 ------
514 ValueError on mapping missing 'method' key
515 """
516 compression_method: str | None
517 if isinstance(compression, Mapping):
518 compression_args = dict(compression)
519 try:
520 compression_method = compression_args.pop("method")
521 except KeyError as err:
522 raise ValueError("If mapping, compression must have key 'method'") from err
523 else:
524 compression_args = {}
525 compression_method = compression
526 return compression_method, compression_args
529@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer")
530def infer_compression(
531 filepath_or_buffer: FilePath | BaseBuffer, compression: str | None
532) -> str | None:
533 """
534 Get the compression method for filepath_or_buffer. If compression='infer',
535 the inferred compression method is returned. Otherwise, the input
536 compression method is returned unchanged, unless it's invalid, in which
537 case an error is raised.
539 Parameters
540 ----------
541 filepath_or_buffer : str or file handle
542 File path or object.
543 {compression_options}
545 .. versionchanged:: 1.4.0 Zstandard support.
547 Returns
548 -------
549 string or None
551 Raises
552 ------
553 ValueError on invalid compression specified.
554 """
555 if compression is None:
556 return None
558 # Infer compression
559 if compression == "infer":
560 # Convert all path types (e.g. pathlib.Path) to strings
561 filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True)
562 if not isinstance(filepath_or_buffer, str):
563 # Cannot infer compression of a buffer, assume no compression
564 return None
566 # Infer compression from the filename/URL extension
567 for extension, compression in _extension_to_compression.items():
568 if filepath_or_buffer.lower().endswith(extension):
569 return compression
570 return None
572 # Compression has been specified. Check that it's valid
573 if compression in _supported_compressions:
574 return compression
576 # https://github.com/python/mypy/issues/5492
577 # Unsupported operand types for + ("List[Optional[str]]" and "List[str]")
578 valid = ["infer", None] + sorted(_supported_compressions) # type: ignore[operator]
579 msg = (
580 f"Unrecognized compression type: {compression}\n"
581 f"Valid compression types are {valid}"
582 )
583 raise ValueError(msg)
586def check_parent_directory(path: Path | str) -> None:
587 """
588 Check if parent directory of a file exists, raise OSError if it does not
590 Parameters
591 ----------
592 path: Path or str
593 Path to check parent directory of
594 """
595 parent = Path(path).parent
596 if not parent.is_dir():
597 raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'")
600@overload
601def get_handle(
602 path_or_buf: FilePath | BaseBuffer,
603 mode: str,
604 *,
605 encoding: str | None = ...,
606 compression: CompressionOptions = ...,
607 memory_map: bool = ...,
608 is_text: Literal[False],
609 errors: str | None = ...,
610 storage_options: StorageOptions = ...,
611) -> IOHandles[bytes]:
612 ...
615@overload
616def get_handle(
617 path_or_buf: FilePath | BaseBuffer,
618 mode: str,
619 *,
620 encoding: str | None = ...,
621 compression: CompressionOptions = ...,
622 memory_map: bool = ...,
623 is_text: Literal[True] = ...,
624 errors: str | None = ...,
625 storage_options: StorageOptions = ...,
626) -> IOHandles[str]:
627 ...
630@overload
631def get_handle(
632 path_or_buf: FilePath | BaseBuffer,
633 mode: str,
634 *,
635 encoding: str | None = ...,
636 compression: CompressionOptions = ...,
637 memory_map: bool = ...,
638 is_text: bool = ...,
639 errors: str | None = ...,
640 storage_options: StorageOptions = ...,
641) -> IOHandles[str] | IOHandles[bytes]:
642 ...
645@doc(compression_options=_shared_docs["compression_options"] % "path_or_buf")
646def get_handle(
647 path_or_buf: FilePath | BaseBuffer,
648 mode: str,
649 *,
650 encoding: str | None = None,
651 compression: CompressionOptions = None,
652 memory_map: bool = False,
653 is_text: bool = True,
654 errors: str | None = None,
655 storage_options: StorageOptions = None,
656) -> IOHandles[str] | IOHandles[bytes]:
657 """
658 Get file handle for given path/buffer and mode.
660 Parameters
661 ----------
662 path_or_buf : str or file handle
663 File path or object.
664 mode : str
665 Mode to open path_or_buf with.
666 encoding : str or None
667 Encoding to use.
668 {compression_options}
670 .. versionchanged:: 1.0.0
671 May now be a dict with key 'method' as compression mode
672 and other keys as compression options if compression
673 mode is 'zip'.
675 .. versionchanged:: 1.1.0
676 Passing compression options as keys in dict is now
677 supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'.
679 .. versionchanged:: 1.4.0 Zstandard support.
681 memory_map : bool, default False
682 See parsers._parser_params for more information. Only used by read_csv.
683 is_text : bool, default True
684 Whether the type of the content passed to the file/buffer is string or
685 bytes. This is not the same as `"b" not in mode`. If a string content is
686 passed to a binary file/buffer, a wrapper is inserted.
687 errors : str, default 'strict'
688 Specifies how encoding and decoding errors are to be handled.
689 See the errors argument for :func:`open` for a full list
690 of options.
691 storage_options: StorageOptions = None
692 Passed to _get_filepath_or_buffer
694 .. versionchanged:: 1.2.0
696 Returns the dataclass IOHandles
697 """
698 # Windows does not default to utf-8. Set to utf-8 for a consistent behavior
699 encoding = encoding or "utf-8"
701 errors = errors or "strict"
703 # read_csv does not know whether the buffer is opened in binary/text mode
704 if _is_binary_mode(path_or_buf, mode) and "b" not in mode:
705 mode += "b"
707 # validate encoding and errors
708 codecs.lookup(encoding)
709 if isinstance(errors, str):
710 codecs.lookup_error(errors)
712 # open URLs
713 ioargs = _get_filepath_or_buffer(
714 path_or_buf,
715 encoding=encoding,
716 compression=compression,
717 mode=mode,
718 storage_options=storage_options,
719 )
721 handle = ioargs.filepath_or_buffer
722 handles: list[BaseBuffer]
724 # memory mapping needs to be the first step
725 # only used for read_csv
726 handle, memory_map, handles = _maybe_memory_map(handle, memory_map)
728 is_path = isinstance(handle, str)
729 compression_args = dict(ioargs.compression)
730 compression = compression_args.pop("method")
732 # Only for write methods
733 if "r" not in mode and is_path:
734 check_parent_directory(str(handle))
736 if compression:
737 if compression != "zstd":
738 # compression libraries do not like an explicit text-mode
739 ioargs.mode = ioargs.mode.replace("t", "")
740 elif compression == "zstd" and "b" not in ioargs.mode:
741 # python-zstandard defaults to text mode, but we always expect
742 # compression libraries to use binary mode.
743 ioargs.mode += "b"
745 # GZ Compression
746 if compression == "gzip":
747 if isinstance(handle, str):
748 # error: Incompatible types in assignment (expression has type
749 # "GzipFile", variable has type "Union[str, BaseBuffer]")
750 handle = gzip.GzipFile( # type: ignore[assignment]
751 filename=handle,
752 mode=ioargs.mode,
753 **compression_args,
754 )
755 else:
756 handle = gzip.GzipFile(
757 # No overload variant of "GzipFile" matches argument types
758 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
759 fileobj=handle, # type: ignore[call-overload]
760 mode=ioargs.mode,
761 **compression_args,
762 )
764 # BZ Compression
765 elif compression == "bz2":
766 # No overload variant of "BZ2File" matches argument types
767 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]"
768 handle = bz2.BZ2File( # type: ignore[call-overload]
769 handle,
770 mode=ioargs.mode,
771 **compression_args,
772 )
774 # ZIP Compression
775 elif compression == "zip":
776 # error: Argument 1 to "_BytesZipFile" has incompatible type
777 # "Union[str, BaseBuffer]"; expected "Union[Union[str, PathLike[str]],
778 # ReadBuffer[bytes], WriteBuffer[bytes]]"
779 handle = _BytesZipFile(
780 handle, ioargs.mode, **compression_args # type: ignore[arg-type]
781 )
782 if handle.buffer.mode == "r":
783 handles.append(handle)
784 zip_names = handle.buffer.namelist()
785 if len(zip_names) == 1:
786 handle = handle.buffer.open(zip_names.pop())
787 elif not zip_names:
788 raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
789 else:
790 raise ValueError(
791 "Multiple files found in ZIP file. "
792 f"Only one file per ZIP: {zip_names}"
793 )
795 # TAR Encoding
796 elif compression == "tar":
797 compression_args.setdefault("mode", ioargs.mode)
798 if isinstance(handle, str):
799 handle = _BytesTarFile(name=handle, **compression_args)
800 else:
801 # error: Argument "fileobj" to "_BytesTarFile" has incompatible
802 # type "BaseBuffer"; expected "Union[ReadBuffer[bytes],
803 # WriteBuffer[bytes], None]"
804 handle = _BytesTarFile(
805 fileobj=handle, **compression_args # type: ignore[arg-type]
806 )
807 assert isinstance(handle, _BytesTarFile)
808 if "r" in handle.buffer.mode:
809 handles.append(handle)
810 files = handle.buffer.getnames()
811 if len(files) == 1:
812 file = handle.buffer.extractfile(files[0])
813 assert file is not None
814 handle = file
815 elif not files:
816 raise ValueError(f"Zero files found in TAR archive {path_or_buf}")
817 else:
818 raise ValueError(
819 "Multiple files found in TAR archive. "
820 f"Only one file per TAR archive: {files}"
821 )
823 # XZ Compression
824 elif compression == "xz":
825 # error: Argument 1 to "LZMAFile" has incompatible type "Union[str,
826 # BaseBuffer]"; expected "Optional[Union[Union[str, bytes, PathLike[str],
827 # PathLike[bytes]], IO[bytes]]]"
828 handle = get_lzma_file()(handle, ioargs.mode) # type: ignore[arg-type]
830 # Zstd Compression
831 elif compression == "zstd":
832 zstd = import_optional_dependency("zstandard")
833 if "r" in ioargs.mode:
834 open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)}
835 else:
836 open_args = {"cctx": zstd.ZstdCompressor(**compression_args)}
837 handle = zstd.open(
838 handle,
839 mode=ioargs.mode,
840 **open_args,
841 )
843 # Unrecognized Compression
844 else:
845 msg = f"Unrecognized compression type: {compression}"
846 raise ValueError(msg)
848 assert not isinstance(handle, str)
849 handles.append(handle)
851 elif isinstance(handle, str):
852 # Check whether the filename is to be opened in binary mode.
853 # Binary mode does not support 'encoding' and 'newline'.
854 if ioargs.encoding and "b" not in ioargs.mode:
855 # Encoding
856 handle = open(
857 handle,
858 ioargs.mode,
859 encoding=ioargs.encoding,
860 errors=errors,
861 newline="",
862 )
863 else:
864 # Binary mode
865 handle = open(handle, ioargs.mode)
866 handles.append(handle)
868 # Convert BytesIO or file objects passed with an encoding
869 is_wrapped = False
870 if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase):
871 # not added to handles as it does not open/buffer resources
872 handle = _BytesIOWrapper(
873 handle,
874 encoding=ioargs.encoding,
875 )
876 elif is_text and (
877 compression or memory_map or _is_binary_mode(handle, ioargs.mode)
878 ):
879 if (
880 not hasattr(handle, "readable")
881 or not hasattr(handle, "writable")
882 or not hasattr(handle, "seekable")
883 ):
884 handle = _IOWrapper(handle)
885 # error: Argument 1 to "TextIOWrapper" has incompatible type
886 # "_IOWrapper"; expected "IO[bytes]"
887 handle = TextIOWrapper(
888 handle, # type: ignore[arg-type]
889 encoding=ioargs.encoding,
890 errors=errors,
891 newline="",
892 )
893 handles.append(handle)
894 # only marked as wrapped when the caller provided a handle
895 is_wrapped = not (
896 isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close
897 )
899 if "r" in ioargs.mode and not hasattr(handle, "read"):
900 raise TypeError(
901 "Expected file path name or file-like object, "
902 f"got {type(ioargs.filepath_or_buffer)} type"
903 )
905 handles.reverse() # close the most recently added buffer first
906 if ioargs.should_close:
907 assert not isinstance(ioargs.filepath_or_buffer, str)
908 handles.append(ioargs.filepath_or_buffer)
910 return IOHandles(
911 # error: Argument "handle" to "IOHandles" has incompatible type
912 # "Union[TextIOWrapper, GzipFile, BaseBuffer, typing.IO[bytes],
913 # typing.IO[Any]]"; expected "pandas._typing.IO[Any]"
914 handle=handle, # type: ignore[arg-type]
915 # error: Argument "created_handles" to "IOHandles" has incompatible type
916 # "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]"
917 created_handles=handles, # type: ignore[arg-type]
918 is_wrapped=is_wrapped,
919 compression=ioargs.compression,
920 )
923# error: Definition of "__enter__" in base class "IOBase" is incompatible
924# with definition in base class "BinaryIO"
925class _BufferedWriter(BytesIO, ABC): # type: ignore[misc]
926 """
927 Some objects do not support multiple .write() calls (TarFile and ZipFile).
928 This wrapper writes to the underlying buffer on close.
929 """
931 @abstractmethod
932 def write_to_buffer(self) -> None:
933 ...
935 def close(self) -> None:
936 if self.closed:
937 # already closed
938 return
939 if self.getvalue():
940 # write to buffer
941 self.seek(0)
942 # error: "_BufferedWriter" has no attribute "buffer"
943 with self.buffer: # type: ignore[attr-defined]
944 self.write_to_buffer()
945 else:
946 # error: "_BufferedWriter" has no attribute "buffer"
947 self.buffer.close() # type: ignore[attr-defined]
948 super().close()
951class _BytesTarFile(_BufferedWriter):
952 def __init__(
953 self,
954 name: str | None = None,
955 mode: Literal["r", "a", "w", "x"] = "r",
956 fileobj: ReadBuffer[bytes] | WriteBuffer[bytes] | None = None,
957 archive_name: str | None = None,
958 **kwargs,
959 ) -> None:
960 super().__init__()
961 self.archive_name = archive_name
962 self.name = name
963 # error: Argument "fileobj" to "open" of "TarFile" has incompatible
964 # type "Union[ReadBuffer[bytes], WriteBuffer[bytes], None]"; expected
965 # "Optional[IO[bytes]]"
966 self.buffer = tarfile.TarFile.open(
967 name=name,
968 mode=self.extend_mode(mode),
969 fileobj=fileobj, # type: ignore[arg-type]
970 **kwargs,
971 )
973 def extend_mode(self, mode: str) -> str:
974 mode = mode.replace("b", "")
975 if mode != "w":
976 return mode
977 if self.name is not None:
978 suffix = Path(self.name).suffix
979 if suffix in (".gz", ".xz", ".bz2"):
980 mode = f"{mode}:{suffix[1:]}"
981 return mode
983 def infer_filename(self) -> str | None:
984 """
985 If an explicit archive_name is not given, we still want the file inside the zip
986 file not to be named something.tar, because that causes confusion (GH39465).
987 """
988 if self.name is None:
989 return None
991 filename = Path(self.name)
992 if filename.suffix == ".tar":
993 return filename.with_suffix("").name
994 elif filename.suffix in (".tar.gz", ".tar.bz2", ".tar.xz"):
995 return filename.with_suffix("").with_suffix("").name
996 return filename.name
998 def write_to_buffer(self) -> None:
999 # TarFile needs a non-empty string
1000 archive_name = self.archive_name or self.infer_filename() or "tar"
1001 tarinfo = tarfile.TarInfo(name=archive_name)
1002 tarinfo.size = len(self.getvalue())
1003 self.buffer.addfile(tarinfo, self)
1006class _BytesZipFile(_BufferedWriter):
1007 def __init__(
1008 self,
1009 file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
1010 mode: str,
1011 archive_name: str | None = None,
1012 **kwargs,
1013 ) -> None:
1014 super().__init__()
1015 mode = mode.replace("b", "")
1016 self.archive_name = archive_name
1018 kwargs.setdefault("compression", zipfile.ZIP_DEFLATED)
1019 # error: Argument 1 to "ZipFile" has incompatible type "Union[
1020 # Union[str, PathLike[str]], ReadBuffer[bytes], WriteBuffer[bytes]]";
1021 # expected "Union[Union[str, PathLike[str]], IO[bytes]]"
1022 self.buffer = zipfile.ZipFile(file, mode, **kwargs) # type: ignore[arg-type]
1024 def infer_filename(self) -> str | None:
1025 """
1026 If an explicit archive_name is not given, we still want the file inside the zip
1027 file not to be named something.zip, because that causes confusion (GH39465).
1028 """
1029 if isinstance(self.buffer.filename, (os.PathLike, str)):
1030 filename = Path(self.buffer.filename)
1031 if filename.suffix == ".zip":
1032 return filename.with_suffix("").name
1033 return filename.name
1034 return None
1036 def write_to_buffer(self) -> None:
1037 # ZipFile needs a non-empty string
1038 archive_name = self.archive_name or self.infer_filename() or "zip"
1039 self.buffer.writestr(archive_name, self.getvalue())
1042class _IOWrapper:
1043 # TextIOWrapper is overly strict: it request that the buffer has seekable, readable,
1044 # and writable. If we have a read-only buffer, we shouldn't need writable and vice
1045 # versa. Some buffers, are seek/read/writ-able but they do not have the "-able"
1046 # methods, e.g., tempfile.SpooledTemporaryFile.
1047 # If a buffer does not have the above "-able" methods, we simple assume they are
1048 # seek/read/writ-able.
1049 def __init__(self, buffer: BaseBuffer) -> None:
1050 self.buffer = buffer
1052 def __getattr__(self, name: str):
1053 return getattr(self.buffer, name)
1055 def readable(self) -> bool:
1056 if hasattr(self.buffer, "readable"):
1057 # error: "BaseBuffer" has no attribute "readable"
1058 return self.buffer.readable() # type: ignore[attr-defined]
1059 return True
1061 def seekable(self) -> bool:
1062 if hasattr(self.buffer, "seekable"):
1063 return self.buffer.seekable()
1064 return True
1066 def writable(self) -> bool:
1067 if hasattr(self.buffer, "writable"):
1068 # error: "BaseBuffer" has no attribute "writable"
1069 return self.buffer.writable() # type: ignore[attr-defined]
1070 return True
1073class _BytesIOWrapper:
1074 # Wrapper that wraps a StringIO buffer and reads bytes from it
1075 # Created for compat with pyarrow read_csv
1076 def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8") -> None:
1077 self.buffer = buffer
1078 self.encoding = encoding
1079 # Because a character can be represented by more than 1 byte,
1080 # it is possible that reading will produce more bytes than n
1081 # We store the extra bytes in this overflow variable, and append the
1082 # overflow to the front of the bytestring the next time reading is performed
1083 self.overflow = b""
1085 def __getattr__(self, attr: str):
1086 return getattr(self.buffer, attr)
1088 def read(self, n: int | None = -1) -> bytes:
1089 assert self.buffer is not None
1090 bytestring = self.buffer.read(n).encode(self.encoding)
1091 # When n=-1/n greater than remaining bytes: Read entire file/rest of file
1092 combined_bytestring = self.overflow + bytestring
1093 if n is None or n < 0 or n >= len(combined_bytestring):
1094 self.overflow = b""
1095 return combined_bytestring
1096 else:
1097 to_return = combined_bytestring[:n]
1098 self.overflow = combined_bytestring[n:]
1099 return to_return
1102def _maybe_memory_map(
1103 handle: str | BaseBuffer, memory_map: bool
1104) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]:
1105 """Try to memory map file/buffer."""
1106 handles: list[BaseBuffer] = []
1107 memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
1108 if not memory_map:
1109 return handle, memory_map, handles
1111 # need to open the file first
1112 if isinstance(handle, str):
1113 handle = open(handle, "rb")
1114 handles.append(handle)
1116 try:
1117 # open mmap and adds *-able
1118 # error: Argument 1 to "_IOWrapper" has incompatible type "mmap";
1119 # expected "BaseBuffer"
1120 wrapped = _IOWrapper(
1121 mmap.mmap(
1122 handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type]
1123 )
1124 )
1125 finally:
1126 for handle in reversed(handles):
1127 # error: "BaseBuffer" has no attribute "close"
1128 handle.close() # type: ignore[attr-defined]
1130 return wrapped, memory_map, [wrapped]
1133def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool:
1134 """Test whether file exists."""
1135 exists = False
1136 filepath_or_buffer = stringify_path(filepath_or_buffer)
1137 if not isinstance(filepath_or_buffer, str):
1138 return exists
1139 try:
1140 exists = os.path.exists(filepath_or_buffer)
1141 # gh-5874: if the filepath is too long will raise here
1142 except (TypeError, ValueError):
1143 pass
1144 return exists
1147def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool:
1148 """Whether the handle is opened in binary mode"""
1149 # specified by user
1150 if "t" in mode or "b" in mode:
1151 return "b" in mode
1153 # exceptions
1154 text_classes = (
1155 # classes that expect string but have 'b' in mode
1156 codecs.StreamWriter,
1157 codecs.StreamReader,
1158 codecs.StreamReaderWriter,
1159 )
1160 if issubclass(type(handle), text_classes):
1161 return False
1163 return isinstance(handle, _get_binary_io_classes()) or "b" in getattr(
1164 handle, "mode", mode
1165 )
1168@functools.lru_cache
1169def _get_binary_io_classes() -> tuple[type, ...]:
1170 """IO classes that that expect bytes"""
1171 binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase)
1173 # python-zstandard doesn't use any of the builtin base classes; instead we
1174 # have to use the `zstd.ZstdDecompressionReader` class for isinstance checks.
1175 # Unfortunately `zstd.ZstdDecompressionReader` isn't exposed by python-zstandard
1176 # so we have to get it from a `zstd.ZstdDecompressor` instance.
1177 # See also https://github.com/indygreg/python-zstandard/pull/165.
1178 zstd = import_optional_dependency("zstandard", errors="ignore")
1179 if zstd is not None:
1180 with zstd.ZstdDecompressor().stream_reader(b"") as reader:
1181 binary_classes += (type(reader),)
1183 return binary_classes