Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/common.py: 19%

422 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Common IO api utilities""" 

2from __future__ import annotations 

3 

4from abc import ( 

5 ABC, 

6 abstractmethod, 

7) 

8import bz2 

9import codecs 

10import dataclasses 

11import functools 

12import gzip 

13from io import ( 

14 BufferedIOBase, 

15 BytesIO, 

16 RawIOBase, 

17 StringIO, 

18 TextIOBase, 

19 TextIOWrapper, 

20) 

21import mmap 

22import os 

23from pathlib import Path 

24import re 

25import tarfile 

26from typing import ( 

27 IO, 

28 Any, 

29 AnyStr, 

30 Generic, 

31 Literal, 

32 Mapping, 

33 Sequence, 

34 TypeVar, 

35 cast, 

36 overload, 

37) 

38from urllib.parse import ( 

39 urljoin, 

40 urlparse as parse_url, 

41 uses_netloc, 

42 uses_params, 

43 uses_relative, 

44) 

45import warnings 

46import zipfile 

47 

48from pandas._typing import ( 

49 BaseBuffer, 

50 CompressionDict, 

51 CompressionOptions, 

52 FilePath, 

53 ReadBuffer, 

54 StorageOptions, 

55 WriteBuffer, 

56) 

57from pandas.compat import get_lzma_file 

58from pandas.compat._optional import import_optional_dependency 

59from pandas.util._decorators import doc 

60from pandas.util._exceptions import find_stack_level 

61 

62from pandas.core.dtypes.common import ( 

63 is_bool, 

64 is_file_like, 

65 is_integer, 

66 is_list_like, 

67) 

68 

69from pandas.core.shared_docs import _shared_docs 

70 

71_VALID_URLS = set(uses_relative + uses_netloc + uses_params) 

72_VALID_URLS.discard("") 

73_RFC_3986_PATTERN = re.compile(r"^[A-Za-z][A-Za-z0-9+\-+.]*://") 

74 

75BaseBufferT = TypeVar("BaseBufferT", bound=BaseBuffer) 

76 

77 

78@dataclasses.dataclass 

79class IOArgs: 

80 """ 

81 Return value of io/common.py:_get_filepath_or_buffer. 

82 """ 

83 

84 filepath_or_buffer: str | BaseBuffer 

85 encoding: str 

86 mode: str 

87 compression: CompressionDict 

88 should_close: bool = False 

89 

90 

91@dataclasses.dataclass 

92class IOHandles(Generic[AnyStr]): 

93 """ 

94 Return value of io/common.py:get_handle 

95 

96 Can be used as a context manager. 

97 

98 This is used to easily close created buffers and to handle corner cases when 

99 TextIOWrapper is inserted. 

100 

101 handle: The file handle to be used. 

102 created_handles: All file handles that are created by get_handle 

103 is_wrapped: Whether a TextIOWrapper needs to be detached. 

104 """ 

105 

106 # handle might not implement the IO-interface 

107 handle: IO[AnyStr] 

108 compression: CompressionDict 

109 created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list) 

110 is_wrapped: bool = False 

111 

112 def close(self) -> None: 

113 """ 

114 Close all created buffers. 

115 

116 Note: If a TextIOWrapper was inserted, it is flushed and detached to 

117 avoid closing the potentially user-created buffer. 

118 """ 

119 if self.is_wrapped: 

120 assert isinstance(self.handle, TextIOWrapper) 

121 self.handle.flush() 

122 self.handle.detach() 

123 self.created_handles.remove(self.handle) 

124 for handle in self.created_handles: 

125 handle.close() 

126 self.created_handles = [] 

127 self.is_wrapped = False 

128 

129 def __enter__(self) -> IOHandles[AnyStr]: 

130 return self 

131 

132 def __exit__(self, *args: Any) -> None: 

133 self.close() 

134 

135 

136def is_url(url: object) -> bool: 

137 """ 

138 Check to see if a URL has a valid protocol. 

139 

140 Parameters 

141 ---------- 

142 url : str or unicode 

143 

144 Returns 

145 ------- 

146 isurl : bool 

147 If `url` has a valid protocol return True otherwise False. 

148 """ 

149 if not isinstance(url, str): 

150 return False 

151 return parse_url(url).scheme in _VALID_URLS 

152 

153 

154@overload 

155def _expand_user(filepath_or_buffer: str) -> str: 

156 ... 

157 

158 

159@overload 

160def _expand_user(filepath_or_buffer: BaseBufferT) -> BaseBufferT: 

161 ... 

162 

163 

164def _expand_user(filepath_or_buffer: str | BaseBufferT) -> str | BaseBufferT: 

165 """ 

166 Return the argument with an initial component of ~ or ~user 

167 replaced by that user's home directory. 

168 

169 Parameters 

170 ---------- 

171 filepath_or_buffer : object to be converted if possible 

172 

173 Returns 

174 ------- 

175 expanded_filepath_or_buffer : an expanded filepath or the 

176 input if not expandable 

177 """ 

178 if isinstance(filepath_or_buffer, str): 

179 return os.path.expanduser(filepath_or_buffer) 

180 return filepath_or_buffer 

181 

182 

183def validate_header_arg(header: object) -> None: 

184 if header is None: 

185 return 

186 if is_integer(header): 

187 header = cast(int, header) 

188 if header < 0: 

189 # GH 27779 

190 raise ValueError( 

191 "Passing negative integer to header is invalid. " 

192 "For no header, use header=None instead" 

193 ) 

194 return 

195 if is_list_like(header, allow_sets=False): 

196 header = cast(Sequence, header) 

197 if not all(map(is_integer, header)): 

198 raise ValueError("header must be integer or list of integers") 

199 if any(i < 0 for i in header): 

200 raise ValueError("cannot specify multi-index header with negative integers") 

201 return 

202 if is_bool(header): 

203 raise TypeError( 

204 "Passing a bool to header is invalid. Use header=None for no header or " 

205 "header=int or list-like of ints to specify " 

206 "the row(s) making up the column names" 

207 ) 

208 # GH 16338 

209 raise ValueError("header must be integer or list of integers") 

210 

211 

212@overload 

213def stringify_path(filepath_or_buffer: FilePath, convert_file_like: bool = ...) -> str: 

214 ... 

215 

216 

217@overload 

218def stringify_path( 

219 filepath_or_buffer: BaseBufferT, convert_file_like: bool = ... 

220) -> BaseBufferT: 

221 ... 

222 

223 

224def stringify_path( 

225 filepath_or_buffer: FilePath | BaseBufferT, 

226 convert_file_like: bool = False, 

227) -> str | BaseBufferT: 

228 """ 

229 Attempt to convert a path-like object to a string. 

230 

231 Parameters 

232 ---------- 

233 filepath_or_buffer : object to be converted 

234 

235 Returns 

236 ------- 

237 str_filepath_or_buffer : maybe a string version of the object 

238 

239 Notes 

240 ----- 

241 Objects supporting the fspath protocol (python 3.6+) are coerced 

242 according to its __fspath__ method. 

243 

244 Any other object is passed through unchanged, which includes bytes, 

245 strings, buffers, or anything else that's not even path-like. 

246 """ 

247 if not convert_file_like and is_file_like(filepath_or_buffer): 

248 # GH 38125: some fsspec objects implement os.PathLike but have already opened a 

249 # file. This prevents opening the file a second time. infer_compression calls 

250 # this function with convert_file_like=True to infer the compression. 

251 return cast(BaseBufferT, filepath_or_buffer) 

252 

253 if isinstance(filepath_or_buffer, os.PathLike): 

254 filepath_or_buffer = filepath_or_buffer.__fspath__() 

255 return _expand_user(filepath_or_buffer) 

256 

257 

258def urlopen(*args, **kwargs): 

259 """ 

260 Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of 

261 the stdlib. 

262 """ 

263 import urllib.request 

264 

265 return urllib.request.urlopen(*args, **kwargs) 

266 

267 

268def is_fsspec_url(url: FilePath | BaseBuffer) -> bool: 

269 """ 

270 Returns true if the given URL looks like 

271 something fsspec can handle 

272 """ 

273 return ( 

274 isinstance(url, str) 

275 and bool(_RFC_3986_PATTERN.match(url)) 

276 and not url.startswith(("http://", "https://")) 

277 ) 

278 

279 

280@doc( 

281 storage_options=_shared_docs["storage_options"], 

282 compression_options=_shared_docs["compression_options"] % "filepath_or_buffer", 

283) 

284def _get_filepath_or_buffer( 

285 filepath_or_buffer: FilePath | BaseBuffer, 

286 encoding: str = "utf-8", 

287 compression: CompressionOptions = None, 

288 mode: str = "r", 

289 storage_options: StorageOptions = None, 

290) -> IOArgs: 

291 """ 

292 If the filepath_or_buffer is a url, translate and return the buffer. 

293 Otherwise passthrough. 

294 

295 Parameters 

296 ---------- 

297 filepath_or_buffer : a url, filepath (str, py.path.local or pathlib.Path), 

298 or buffer 

299 {compression_options} 

300 

301 .. versionchanged:: 1.4.0 Zstandard support. 

302 

303 encoding : the encoding to use to decode bytes, default is 'utf-8' 

304 mode : str, optional 

305 

306 {storage_options} 

307 

308 .. versionadded:: 1.2.0 

309 

310 ..versionchange:: 1.2.0 

311 

312 Returns the dataclass IOArgs. 

313 """ 

314 filepath_or_buffer = stringify_path(filepath_or_buffer) 

315 

316 # handle compression dict 

317 compression_method, compression = get_compression_method(compression) 

318 compression_method = infer_compression(filepath_or_buffer, compression_method) 

319 

320 # GH21227 internal compression is not used for non-binary handles. 

321 if compression_method and hasattr(filepath_or_buffer, "write") and "b" not in mode: 

322 warnings.warn( 

323 "compression has no effect when passing a non-binary object as input.", 

324 RuntimeWarning, 

325 stacklevel=find_stack_level(), 

326 ) 

327 compression_method = None 

328 

329 compression = dict(compression, method=compression_method) 

330 

331 # bz2 and xz do not write the byte order mark for utf-16 and utf-32 

332 # print a warning when writing such files 

333 if ( 

334 "w" in mode 

335 and compression_method in ["bz2", "xz"] 

336 and encoding in ["utf-16", "utf-32"] 

337 ): 

338 warnings.warn( 

339 f"{compression} will not write the byte order mark for {encoding}", 

340 UnicodeWarning, 

341 stacklevel=find_stack_level(), 

342 ) 

343 

344 # Use binary mode when converting path-like objects to file-like objects (fsspec) 

345 # except when text mode is explicitly requested. The original mode is returned if 

346 # fsspec is not used. 

347 fsspec_mode = mode 

348 if "t" not in fsspec_mode and "b" not in fsspec_mode: 

349 fsspec_mode += "b" 

350 

351 if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): 

352 # TODO: fsspec can also handle HTTP via requests, but leaving this 

353 # unchanged. using fsspec appears to break the ability to infer if the 

354 # server responded with gzipped data 

355 storage_options = storage_options or {} 

356 

357 # waiting until now for importing to match intended lazy logic of 

358 # urlopen function defined elsewhere in this module 

359 import urllib.request 

360 

361 # assuming storage_options is to be interpreted as headers 

362 req_info = urllib.request.Request(filepath_or_buffer, headers=storage_options) 

363 with urlopen(req_info) as req: 

364 content_encoding = req.headers.get("Content-Encoding", None) 

365 if content_encoding == "gzip": 

366 # Override compression based on Content-Encoding header 

367 compression = {"method": "gzip"} 

368 reader = BytesIO(req.read()) 

369 return IOArgs( 

370 filepath_or_buffer=reader, 

371 encoding=encoding, 

372 compression=compression, 

373 should_close=True, 

374 mode=fsspec_mode, 

375 ) 

376 

377 if is_fsspec_url(filepath_or_buffer): 

378 assert isinstance( 

379 filepath_or_buffer, str 

380 ) # just to appease mypy for this branch 

381 # two special-case s3-like protocols; these have special meaning in Hadoop, 

382 # but are equivalent to just "s3" from fsspec's point of view 

383 # cc #11071 

384 if filepath_or_buffer.startswith("s3a://"): 

385 filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://") 

386 if filepath_or_buffer.startswith("s3n://"): 

387 filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://") 

388 fsspec = import_optional_dependency("fsspec") 

389 

390 # If botocore is installed we fallback to reading with anon=True 

391 # to allow reads from public buckets 

392 err_types_to_retry_with_anon: list[Any] = [] 

393 try: 

394 import_optional_dependency("botocore") 

395 from botocore.exceptions import ( 

396 ClientError, 

397 NoCredentialsError, 

398 ) 

399 

400 err_types_to_retry_with_anon = [ 

401 ClientError, 

402 NoCredentialsError, 

403 PermissionError, 

404 ] 

405 except ImportError: 

406 pass 

407 

408 try: 

409 file_obj = fsspec.open( 

410 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) 

411 ).open() 

412 # GH 34626 Reads from Public Buckets without Credentials needs anon=True 

413 except tuple(err_types_to_retry_with_anon): 

414 if storage_options is None: 

415 storage_options = {"anon": True} 

416 else: 

417 # don't mutate user input. 

418 storage_options = dict(storage_options) 

419 storage_options["anon"] = True 

420 file_obj = fsspec.open( 

421 filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) 

422 ).open() 

423 

424 return IOArgs( 

425 filepath_or_buffer=file_obj, 

426 encoding=encoding, 

427 compression=compression, 

428 should_close=True, 

429 mode=fsspec_mode, 

430 ) 

431 elif storage_options: 

432 raise ValueError( 

433 "storage_options passed with file object or non-fsspec file path" 

434 ) 

435 

436 if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): 

437 return IOArgs( 

438 filepath_or_buffer=_expand_user(filepath_or_buffer), 

439 encoding=encoding, 

440 compression=compression, 

441 should_close=False, 

442 mode=mode, 

443 ) 

444 

445 # is_file_like requires (read | write) & __iter__ but __iter__ is only 

446 # needed for read_csv(engine=python) 

447 if not ( 

448 hasattr(filepath_or_buffer, "read") or hasattr(filepath_or_buffer, "write") 

449 ): 

450 msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}" 

451 raise ValueError(msg) 

452 

453 return IOArgs( 

454 filepath_or_buffer=filepath_or_buffer, 

455 encoding=encoding, 

456 compression=compression, 

457 should_close=False, 

458 mode=mode, 

459 ) 

460 

461 

462def file_path_to_url(path: str) -> str: 

463 """ 

464 converts an absolute native path to a FILE URL. 

465 

466 Parameters 

467 ---------- 

468 path : a path in native format 

469 

470 Returns 

471 ------- 

472 a valid FILE URL 

473 """ 

474 # lazify expensive import (~30ms) 

475 from urllib.request import pathname2url 

476 

477 return urljoin("file:", pathname2url(path)) 

478 

479 

480_extension_to_compression = { 

481 ".tar": "tar", 

482 ".tar.gz": "tar", 

483 ".tar.bz2": "tar", 

484 ".tar.xz": "tar", 

485 ".gz": "gzip", 

486 ".bz2": "bz2", 

487 ".zip": "zip", 

488 ".xz": "xz", 

489 ".zst": "zstd", 

490} 

491_supported_compressions = set(_extension_to_compression.values()) 

492 

493 

494def get_compression_method( 

495 compression: CompressionOptions, 

496) -> tuple[str | None, CompressionDict]: 

497 """ 

498 Simplifies a compression argument to a compression method string and 

499 a mapping containing additional arguments. 

500 

501 Parameters 

502 ---------- 

503 compression : str or mapping 

504 If string, specifies the compression method. If mapping, value at key 

505 'method' specifies compression method. 

506 

507 Returns 

508 ------- 

509 tuple of ({compression method}, Optional[str] 

510 {compression arguments}, Dict[str, Any]) 

511 

512 Raises 

513 ------ 

514 ValueError on mapping missing 'method' key 

515 """ 

516 compression_method: str | None 

517 if isinstance(compression, Mapping): 

518 compression_args = dict(compression) 

519 try: 

520 compression_method = compression_args.pop("method") 

521 except KeyError as err: 

522 raise ValueError("If mapping, compression must have key 'method'") from err 

523 else: 

524 compression_args = {} 

525 compression_method = compression 

526 return compression_method, compression_args 

527 

528 

529@doc(compression_options=_shared_docs["compression_options"] % "filepath_or_buffer") 

530def infer_compression( 

531 filepath_or_buffer: FilePath | BaseBuffer, compression: str | None 

532) -> str | None: 

533 """ 

534 Get the compression method for filepath_or_buffer. If compression='infer', 

535 the inferred compression method is returned. Otherwise, the input 

536 compression method is returned unchanged, unless it's invalid, in which 

537 case an error is raised. 

538 

539 Parameters 

540 ---------- 

541 filepath_or_buffer : str or file handle 

542 File path or object. 

543 {compression_options} 

544 

545 .. versionchanged:: 1.4.0 Zstandard support. 

546 

547 Returns 

548 ------- 

549 string or None 

550 

551 Raises 

552 ------ 

553 ValueError on invalid compression specified. 

554 """ 

555 if compression is None: 

556 return None 

557 

558 # Infer compression 

559 if compression == "infer": 

560 # Convert all path types (e.g. pathlib.Path) to strings 

561 filepath_or_buffer = stringify_path(filepath_or_buffer, convert_file_like=True) 

562 if not isinstance(filepath_or_buffer, str): 

563 # Cannot infer compression of a buffer, assume no compression 

564 return None 

565 

566 # Infer compression from the filename/URL extension 

567 for extension, compression in _extension_to_compression.items(): 

568 if filepath_or_buffer.lower().endswith(extension): 

569 return compression 

570 return None 

571 

572 # Compression has been specified. Check that it's valid 

573 if compression in _supported_compressions: 

574 return compression 

575 

576 # https://github.com/python/mypy/issues/5492 

577 # Unsupported operand types for + ("List[Optional[str]]" and "List[str]") 

578 valid = ["infer", None] + sorted(_supported_compressions) # type: ignore[operator] 

579 msg = ( 

580 f"Unrecognized compression type: {compression}\n" 

581 f"Valid compression types are {valid}" 

582 ) 

583 raise ValueError(msg) 

584 

585 

586def check_parent_directory(path: Path | str) -> None: 

587 """ 

588 Check if parent directory of a file exists, raise OSError if it does not 

589 

590 Parameters 

591 ---------- 

592 path: Path or str 

593 Path to check parent directory of 

594 """ 

595 parent = Path(path).parent 

596 if not parent.is_dir(): 

597 raise OSError(rf"Cannot save file into a non-existent directory: '{parent}'") 

598 

599 

600@overload 

601def get_handle( 

602 path_or_buf: FilePath | BaseBuffer, 

603 mode: str, 

604 *, 

605 encoding: str | None = ..., 

606 compression: CompressionOptions = ..., 

607 memory_map: bool = ..., 

608 is_text: Literal[False], 

609 errors: str | None = ..., 

610 storage_options: StorageOptions = ..., 

611) -> IOHandles[bytes]: 

612 ... 

613 

614 

615@overload 

616def get_handle( 

617 path_or_buf: FilePath | BaseBuffer, 

618 mode: str, 

619 *, 

620 encoding: str | None = ..., 

621 compression: CompressionOptions = ..., 

622 memory_map: bool = ..., 

623 is_text: Literal[True] = ..., 

624 errors: str | None = ..., 

625 storage_options: StorageOptions = ..., 

626) -> IOHandles[str]: 

627 ... 

628 

629 

630@overload 

631def get_handle( 

632 path_or_buf: FilePath | BaseBuffer, 

633 mode: str, 

634 *, 

635 encoding: str | None = ..., 

636 compression: CompressionOptions = ..., 

637 memory_map: bool = ..., 

638 is_text: bool = ..., 

639 errors: str | None = ..., 

640 storage_options: StorageOptions = ..., 

641) -> IOHandles[str] | IOHandles[bytes]: 

642 ... 

643 

644 

645@doc(compression_options=_shared_docs["compression_options"] % "path_or_buf") 

646def get_handle( 

647 path_or_buf: FilePath | BaseBuffer, 

648 mode: str, 

649 *, 

650 encoding: str | None = None, 

651 compression: CompressionOptions = None, 

652 memory_map: bool = False, 

653 is_text: bool = True, 

654 errors: str | None = None, 

655 storage_options: StorageOptions = None, 

656) -> IOHandles[str] | IOHandles[bytes]: 

657 """ 

658 Get file handle for given path/buffer and mode. 

659 

660 Parameters 

661 ---------- 

662 path_or_buf : str or file handle 

663 File path or object. 

664 mode : str 

665 Mode to open path_or_buf with. 

666 encoding : str or None 

667 Encoding to use. 

668 {compression_options} 

669 

670 .. versionchanged:: 1.0.0 

671 May now be a dict with key 'method' as compression mode 

672 and other keys as compression options if compression 

673 mode is 'zip'. 

674 

675 .. versionchanged:: 1.1.0 

676 Passing compression options as keys in dict is now 

677 supported for compression modes 'gzip', 'bz2', 'zstd' and 'zip'. 

678 

679 .. versionchanged:: 1.4.0 Zstandard support. 

680 

681 memory_map : bool, default False 

682 See parsers._parser_params for more information. Only used by read_csv. 

683 is_text : bool, default True 

684 Whether the type of the content passed to the file/buffer is string or 

685 bytes. This is not the same as `"b" not in mode`. If a string content is 

686 passed to a binary file/buffer, a wrapper is inserted. 

687 errors : str, default 'strict' 

688 Specifies how encoding and decoding errors are to be handled. 

689 See the errors argument for :func:`open` for a full list 

690 of options. 

691 storage_options: StorageOptions = None 

692 Passed to _get_filepath_or_buffer 

693 

694 .. versionchanged:: 1.2.0 

695 

696 Returns the dataclass IOHandles 

697 """ 

698 # Windows does not default to utf-8. Set to utf-8 for a consistent behavior 

699 encoding = encoding or "utf-8" 

700 

701 errors = errors or "strict" 

702 

703 # read_csv does not know whether the buffer is opened in binary/text mode 

704 if _is_binary_mode(path_or_buf, mode) and "b" not in mode: 

705 mode += "b" 

706 

707 # validate encoding and errors 

708 codecs.lookup(encoding) 

709 if isinstance(errors, str): 

710 codecs.lookup_error(errors) 

711 

712 # open URLs 

713 ioargs = _get_filepath_or_buffer( 

714 path_or_buf, 

715 encoding=encoding, 

716 compression=compression, 

717 mode=mode, 

718 storage_options=storage_options, 

719 ) 

720 

721 handle = ioargs.filepath_or_buffer 

722 handles: list[BaseBuffer] 

723 

724 # memory mapping needs to be the first step 

725 # only used for read_csv 

726 handle, memory_map, handles = _maybe_memory_map(handle, memory_map) 

727 

728 is_path = isinstance(handle, str) 

729 compression_args = dict(ioargs.compression) 

730 compression = compression_args.pop("method") 

731 

732 # Only for write methods 

733 if "r" not in mode and is_path: 

734 check_parent_directory(str(handle)) 

735 

736 if compression: 

737 if compression != "zstd": 

738 # compression libraries do not like an explicit text-mode 

739 ioargs.mode = ioargs.mode.replace("t", "") 

740 elif compression == "zstd" and "b" not in ioargs.mode: 

741 # python-zstandard defaults to text mode, but we always expect 

742 # compression libraries to use binary mode. 

743 ioargs.mode += "b" 

744 

745 # GZ Compression 

746 if compression == "gzip": 

747 if isinstance(handle, str): 

748 # error: Incompatible types in assignment (expression has type 

749 # "GzipFile", variable has type "Union[str, BaseBuffer]") 

750 handle = gzip.GzipFile( # type: ignore[assignment] 

751 filename=handle, 

752 mode=ioargs.mode, 

753 **compression_args, 

754 ) 

755 else: 

756 handle = gzip.GzipFile( 

757 # No overload variant of "GzipFile" matches argument types 

758 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]" 

759 fileobj=handle, # type: ignore[call-overload] 

760 mode=ioargs.mode, 

761 **compression_args, 

762 ) 

763 

764 # BZ Compression 

765 elif compression == "bz2": 

766 # No overload variant of "BZ2File" matches argument types 

767 # "Union[str, BaseBuffer]", "str", "Dict[str, Any]" 

768 handle = bz2.BZ2File( # type: ignore[call-overload] 

769 handle, 

770 mode=ioargs.mode, 

771 **compression_args, 

772 ) 

773 

774 # ZIP Compression 

775 elif compression == "zip": 

776 # error: Argument 1 to "_BytesZipFile" has incompatible type 

777 # "Union[str, BaseBuffer]"; expected "Union[Union[str, PathLike[str]], 

778 # ReadBuffer[bytes], WriteBuffer[bytes]]" 

779 handle = _BytesZipFile( 

780 handle, ioargs.mode, **compression_args # type: ignore[arg-type] 

781 ) 

782 if handle.buffer.mode == "r": 

783 handles.append(handle) 

784 zip_names = handle.buffer.namelist() 

785 if len(zip_names) == 1: 

786 handle = handle.buffer.open(zip_names.pop()) 

787 elif not zip_names: 

788 raise ValueError(f"Zero files found in ZIP file {path_or_buf}") 

789 else: 

790 raise ValueError( 

791 "Multiple files found in ZIP file. " 

792 f"Only one file per ZIP: {zip_names}" 

793 ) 

794 

795 # TAR Encoding 

796 elif compression == "tar": 

797 compression_args.setdefault("mode", ioargs.mode) 

798 if isinstance(handle, str): 

799 handle = _BytesTarFile(name=handle, **compression_args) 

800 else: 

801 # error: Argument "fileobj" to "_BytesTarFile" has incompatible 

802 # type "BaseBuffer"; expected "Union[ReadBuffer[bytes], 

803 # WriteBuffer[bytes], None]" 

804 handle = _BytesTarFile( 

805 fileobj=handle, **compression_args # type: ignore[arg-type] 

806 ) 

807 assert isinstance(handle, _BytesTarFile) 

808 if "r" in handle.buffer.mode: 

809 handles.append(handle) 

810 files = handle.buffer.getnames() 

811 if len(files) == 1: 

812 file = handle.buffer.extractfile(files[0]) 

813 assert file is not None 

814 handle = file 

815 elif not files: 

816 raise ValueError(f"Zero files found in TAR archive {path_or_buf}") 

817 else: 

818 raise ValueError( 

819 "Multiple files found in TAR archive. " 

820 f"Only one file per TAR archive: {files}" 

821 ) 

822 

823 # XZ Compression 

824 elif compression == "xz": 

825 # error: Argument 1 to "LZMAFile" has incompatible type "Union[str, 

826 # BaseBuffer]"; expected "Optional[Union[Union[str, bytes, PathLike[str], 

827 # PathLike[bytes]], IO[bytes]]]" 

828 handle = get_lzma_file()(handle, ioargs.mode) # type: ignore[arg-type] 

829 

830 # Zstd Compression 

831 elif compression == "zstd": 

832 zstd = import_optional_dependency("zstandard") 

833 if "r" in ioargs.mode: 

834 open_args = {"dctx": zstd.ZstdDecompressor(**compression_args)} 

835 else: 

836 open_args = {"cctx": zstd.ZstdCompressor(**compression_args)} 

837 handle = zstd.open( 

838 handle, 

839 mode=ioargs.mode, 

840 **open_args, 

841 ) 

842 

843 # Unrecognized Compression 

844 else: 

845 msg = f"Unrecognized compression type: {compression}" 

846 raise ValueError(msg) 

847 

848 assert not isinstance(handle, str) 

849 handles.append(handle) 

850 

851 elif isinstance(handle, str): 

852 # Check whether the filename is to be opened in binary mode. 

853 # Binary mode does not support 'encoding' and 'newline'. 

854 if ioargs.encoding and "b" not in ioargs.mode: 

855 # Encoding 

856 handle = open( 

857 handle, 

858 ioargs.mode, 

859 encoding=ioargs.encoding, 

860 errors=errors, 

861 newline="", 

862 ) 

863 else: 

864 # Binary mode 

865 handle = open(handle, ioargs.mode) 

866 handles.append(handle) 

867 

868 # Convert BytesIO or file objects passed with an encoding 

869 is_wrapped = False 

870 if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase): 

871 # not added to handles as it does not open/buffer resources 

872 handle = _BytesIOWrapper( 

873 handle, 

874 encoding=ioargs.encoding, 

875 ) 

876 elif is_text and ( 

877 compression or memory_map or _is_binary_mode(handle, ioargs.mode) 

878 ): 

879 if ( 

880 not hasattr(handle, "readable") 

881 or not hasattr(handle, "writable") 

882 or not hasattr(handle, "seekable") 

883 ): 

884 handle = _IOWrapper(handle) 

885 # error: Argument 1 to "TextIOWrapper" has incompatible type 

886 # "_IOWrapper"; expected "IO[bytes]" 

887 handle = TextIOWrapper( 

888 handle, # type: ignore[arg-type] 

889 encoding=ioargs.encoding, 

890 errors=errors, 

891 newline="", 

892 ) 

893 handles.append(handle) 

894 # only marked as wrapped when the caller provided a handle 

895 is_wrapped = not ( 

896 isinstance(ioargs.filepath_or_buffer, str) or ioargs.should_close 

897 ) 

898 

899 if "r" in ioargs.mode and not hasattr(handle, "read"): 

900 raise TypeError( 

901 "Expected file path name or file-like object, " 

902 f"got {type(ioargs.filepath_or_buffer)} type" 

903 ) 

904 

905 handles.reverse() # close the most recently added buffer first 

906 if ioargs.should_close: 

907 assert not isinstance(ioargs.filepath_or_buffer, str) 

908 handles.append(ioargs.filepath_or_buffer) 

909 

910 return IOHandles( 

911 # error: Argument "handle" to "IOHandles" has incompatible type 

912 # "Union[TextIOWrapper, GzipFile, BaseBuffer, typing.IO[bytes], 

913 # typing.IO[Any]]"; expected "pandas._typing.IO[Any]" 

914 handle=handle, # type: ignore[arg-type] 

915 # error: Argument "created_handles" to "IOHandles" has incompatible type 

916 # "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]" 

917 created_handles=handles, # type: ignore[arg-type] 

918 is_wrapped=is_wrapped, 

919 compression=ioargs.compression, 

920 ) 

921 

922 

923# error: Definition of "__enter__" in base class "IOBase" is incompatible 

924# with definition in base class "BinaryIO" 

925class _BufferedWriter(BytesIO, ABC): # type: ignore[misc] 

926 """ 

927 Some objects do not support multiple .write() calls (TarFile and ZipFile). 

928 This wrapper writes to the underlying buffer on close. 

929 """ 

930 

931 @abstractmethod 

932 def write_to_buffer(self) -> None: 

933 ... 

934 

935 def close(self) -> None: 

936 if self.closed: 

937 # already closed 

938 return 

939 if self.getvalue(): 

940 # write to buffer 

941 self.seek(0) 

942 # error: "_BufferedWriter" has no attribute "buffer" 

943 with self.buffer: # type: ignore[attr-defined] 

944 self.write_to_buffer() 

945 else: 

946 # error: "_BufferedWriter" has no attribute "buffer" 

947 self.buffer.close() # type: ignore[attr-defined] 

948 super().close() 

949 

950 

951class _BytesTarFile(_BufferedWriter): 

952 def __init__( 

953 self, 

954 name: str | None = None, 

955 mode: Literal["r", "a", "w", "x"] = "r", 

956 fileobj: ReadBuffer[bytes] | WriteBuffer[bytes] | None = None, 

957 archive_name: str | None = None, 

958 **kwargs, 

959 ) -> None: 

960 super().__init__() 

961 self.archive_name = archive_name 

962 self.name = name 

963 # error: Argument "fileobj" to "open" of "TarFile" has incompatible 

964 # type "Union[ReadBuffer[bytes], WriteBuffer[bytes], None]"; expected 

965 # "Optional[IO[bytes]]" 

966 self.buffer = tarfile.TarFile.open( 

967 name=name, 

968 mode=self.extend_mode(mode), 

969 fileobj=fileobj, # type: ignore[arg-type] 

970 **kwargs, 

971 ) 

972 

973 def extend_mode(self, mode: str) -> str: 

974 mode = mode.replace("b", "") 

975 if mode != "w": 

976 return mode 

977 if self.name is not None: 

978 suffix = Path(self.name).suffix 

979 if suffix in (".gz", ".xz", ".bz2"): 

980 mode = f"{mode}:{suffix[1:]}" 

981 return mode 

982 

983 def infer_filename(self) -> str | None: 

984 """ 

985 If an explicit archive_name is not given, we still want the file inside the zip 

986 file not to be named something.tar, because that causes confusion (GH39465). 

987 """ 

988 if self.name is None: 

989 return None 

990 

991 filename = Path(self.name) 

992 if filename.suffix == ".tar": 

993 return filename.with_suffix("").name 

994 elif filename.suffix in (".tar.gz", ".tar.bz2", ".tar.xz"): 

995 return filename.with_suffix("").with_suffix("").name 

996 return filename.name 

997 

998 def write_to_buffer(self) -> None: 

999 # TarFile needs a non-empty string 

1000 archive_name = self.archive_name or self.infer_filename() or "tar" 

1001 tarinfo = tarfile.TarInfo(name=archive_name) 

1002 tarinfo.size = len(self.getvalue()) 

1003 self.buffer.addfile(tarinfo, self) 

1004 

1005 

1006class _BytesZipFile(_BufferedWriter): 

1007 def __init__( 

1008 self, 

1009 file: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], 

1010 mode: str, 

1011 archive_name: str | None = None, 

1012 **kwargs, 

1013 ) -> None: 

1014 super().__init__() 

1015 mode = mode.replace("b", "") 

1016 self.archive_name = archive_name 

1017 

1018 kwargs.setdefault("compression", zipfile.ZIP_DEFLATED) 

1019 # error: Argument 1 to "ZipFile" has incompatible type "Union[ 

1020 # Union[str, PathLike[str]], ReadBuffer[bytes], WriteBuffer[bytes]]"; 

1021 # expected "Union[Union[str, PathLike[str]], IO[bytes]]" 

1022 self.buffer = zipfile.ZipFile(file, mode, **kwargs) # type: ignore[arg-type] 

1023 

1024 def infer_filename(self) -> str | None: 

1025 """ 

1026 If an explicit archive_name is not given, we still want the file inside the zip 

1027 file not to be named something.zip, because that causes confusion (GH39465). 

1028 """ 

1029 if isinstance(self.buffer.filename, (os.PathLike, str)): 

1030 filename = Path(self.buffer.filename) 

1031 if filename.suffix == ".zip": 

1032 return filename.with_suffix("").name 

1033 return filename.name 

1034 return None 

1035 

1036 def write_to_buffer(self) -> None: 

1037 # ZipFile needs a non-empty string 

1038 archive_name = self.archive_name or self.infer_filename() or "zip" 

1039 self.buffer.writestr(archive_name, self.getvalue()) 

1040 

1041 

1042class _IOWrapper: 

1043 # TextIOWrapper is overly strict: it request that the buffer has seekable, readable, 

1044 # and writable. If we have a read-only buffer, we shouldn't need writable and vice 

1045 # versa. Some buffers, are seek/read/writ-able but they do not have the "-able" 

1046 # methods, e.g., tempfile.SpooledTemporaryFile. 

1047 # If a buffer does not have the above "-able" methods, we simple assume they are 

1048 # seek/read/writ-able. 

1049 def __init__(self, buffer: BaseBuffer) -> None: 

1050 self.buffer = buffer 

1051 

1052 def __getattr__(self, name: str): 

1053 return getattr(self.buffer, name) 

1054 

1055 def readable(self) -> bool: 

1056 if hasattr(self.buffer, "readable"): 

1057 # error: "BaseBuffer" has no attribute "readable" 

1058 return self.buffer.readable() # type: ignore[attr-defined] 

1059 return True 

1060 

1061 def seekable(self) -> bool: 

1062 if hasattr(self.buffer, "seekable"): 

1063 return self.buffer.seekable() 

1064 return True 

1065 

1066 def writable(self) -> bool: 

1067 if hasattr(self.buffer, "writable"): 

1068 # error: "BaseBuffer" has no attribute "writable" 

1069 return self.buffer.writable() # type: ignore[attr-defined] 

1070 return True 

1071 

1072 

1073class _BytesIOWrapper: 

1074 # Wrapper that wraps a StringIO buffer and reads bytes from it 

1075 # Created for compat with pyarrow read_csv 

1076 def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8") -> None: 

1077 self.buffer = buffer 

1078 self.encoding = encoding 

1079 # Because a character can be represented by more than 1 byte, 

1080 # it is possible that reading will produce more bytes than n 

1081 # We store the extra bytes in this overflow variable, and append the 

1082 # overflow to the front of the bytestring the next time reading is performed 

1083 self.overflow = b"" 

1084 

1085 def __getattr__(self, attr: str): 

1086 return getattr(self.buffer, attr) 

1087 

1088 def read(self, n: int | None = -1) -> bytes: 

1089 assert self.buffer is not None 

1090 bytestring = self.buffer.read(n).encode(self.encoding) 

1091 # When n=-1/n greater than remaining bytes: Read entire file/rest of file 

1092 combined_bytestring = self.overflow + bytestring 

1093 if n is None or n < 0 or n >= len(combined_bytestring): 

1094 self.overflow = b"" 

1095 return combined_bytestring 

1096 else: 

1097 to_return = combined_bytestring[:n] 

1098 self.overflow = combined_bytestring[n:] 

1099 return to_return 

1100 

1101 

1102def _maybe_memory_map( 

1103 handle: str | BaseBuffer, memory_map: bool 

1104) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]: 

1105 """Try to memory map file/buffer.""" 

1106 handles: list[BaseBuffer] = [] 

1107 memory_map &= hasattr(handle, "fileno") or isinstance(handle, str) 

1108 if not memory_map: 

1109 return handle, memory_map, handles 

1110 

1111 # need to open the file first 

1112 if isinstance(handle, str): 

1113 handle = open(handle, "rb") 

1114 handles.append(handle) 

1115 

1116 try: 

1117 # open mmap and adds *-able 

1118 # error: Argument 1 to "_IOWrapper" has incompatible type "mmap"; 

1119 # expected "BaseBuffer" 

1120 wrapped = _IOWrapper( 

1121 mmap.mmap( 

1122 handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type] 

1123 ) 

1124 ) 

1125 finally: 

1126 for handle in reversed(handles): 

1127 # error: "BaseBuffer" has no attribute "close" 

1128 handle.close() # type: ignore[attr-defined] 

1129 

1130 return wrapped, memory_map, [wrapped] 

1131 

1132 

1133def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool: 

1134 """Test whether file exists.""" 

1135 exists = False 

1136 filepath_or_buffer = stringify_path(filepath_or_buffer) 

1137 if not isinstance(filepath_or_buffer, str): 

1138 return exists 

1139 try: 

1140 exists = os.path.exists(filepath_or_buffer) 

1141 # gh-5874: if the filepath is too long will raise here 

1142 except (TypeError, ValueError): 

1143 pass 

1144 return exists 

1145 

1146 

1147def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool: 

1148 """Whether the handle is opened in binary mode""" 

1149 # specified by user 

1150 if "t" in mode or "b" in mode: 

1151 return "b" in mode 

1152 

1153 # exceptions 

1154 text_classes = ( 

1155 # classes that expect string but have 'b' in mode 

1156 codecs.StreamWriter, 

1157 codecs.StreamReader, 

1158 codecs.StreamReaderWriter, 

1159 ) 

1160 if issubclass(type(handle), text_classes): 

1161 return False 

1162 

1163 return isinstance(handle, _get_binary_io_classes()) or "b" in getattr( 

1164 handle, "mode", mode 

1165 ) 

1166 

1167 

1168@functools.lru_cache 

1169def _get_binary_io_classes() -> tuple[type, ...]: 

1170 """IO classes that that expect bytes""" 

1171 binary_classes: tuple[type, ...] = (BufferedIOBase, RawIOBase) 

1172 

1173 # python-zstandard doesn't use any of the builtin base classes; instead we 

1174 # have to use the `zstd.ZstdDecompressionReader` class for isinstance checks. 

1175 # Unfortunately `zstd.ZstdDecompressionReader` isn't exposed by python-zstandard 

1176 # so we have to get it from a `zstd.ZstdDecompressor` instance. 

1177 # See also https://github.com/indygreg/python-zstandard/pull/165. 

1178 zstd = import_optional_dependency("zstandard", errors="ignore") 

1179 if zstd is not None: 

1180 with zstd.ZstdDecompressor().stream_reader(b"") as reader: 

1181 binary_classes += (type(reader),) 

1182 

1183 return binary_classes