Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/parquet.py: 16%

158 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" parquet compat """ 

2from __future__ import annotations 

3 

4import io 

5import os 

6from typing import Any 

7from warnings import catch_warnings 

8 

9from pandas._typing import ( 

10 FilePath, 

11 ReadBuffer, 

12 StorageOptions, 

13 WriteBuffer, 

14) 

15from pandas.compat._optional import import_optional_dependency 

16from pandas.errors import AbstractMethodError 

17from pandas.util._decorators import doc 

18 

19from pandas import ( 

20 DataFrame, 

21 MultiIndex, 

22 get_option, 

23) 

24from pandas.core.shared_docs import _shared_docs 

25from pandas.util.version import Version 

26 

27from pandas.io.common import ( 

28 IOHandles, 

29 get_handle, 

30 is_fsspec_url, 

31 is_url, 

32 stringify_path, 

33) 

34 

35 

36def get_engine(engine: str) -> BaseImpl: 

37 """return our implementation""" 

38 if engine == "auto": 

39 engine = get_option("io.parquet.engine") 

40 

41 if engine == "auto": 

42 # try engines in this order 

43 engine_classes = [PyArrowImpl, FastParquetImpl] 

44 

45 error_msgs = "" 

46 for engine_class in engine_classes: 

47 try: 

48 return engine_class() 

49 except ImportError as err: 

50 error_msgs += "\n - " + str(err) 

51 

52 raise ImportError( 

53 "Unable to find a usable engine; " 

54 "tried using: 'pyarrow', 'fastparquet'.\n" 

55 "A suitable version of " 

56 "pyarrow or fastparquet is required for parquet " 

57 "support.\n" 

58 "Trying to import the above resulted in these errors:" 

59 f"{error_msgs}" 

60 ) 

61 

62 if engine == "pyarrow": 

63 return PyArrowImpl() 

64 elif engine == "fastparquet": 

65 return FastParquetImpl() 

66 

67 raise ValueError("engine must be one of 'pyarrow', 'fastparquet'") 

68 

69 

70def _get_path_or_handle( 

71 path: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], 

72 fs: Any, 

73 storage_options: StorageOptions = None, 

74 mode: str = "rb", 

75 is_dir: bool = False, 

76) -> tuple[ 

77 FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], IOHandles[bytes] | None, Any 

78]: 

79 """File handling for PyArrow.""" 

80 path_or_handle = stringify_path(path) 

81 if is_fsspec_url(path_or_handle) and fs is None: 

82 fsspec = import_optional_dependency("fsspec") 

83 

84 fs, path_or_handle = fsspec.core.url_to_fs( 

85 path_or_handle, **(storage_options or {}) 

86 ) 

87 elif storage_options and (not is_url(path_or_handle) or mode != "rb"): 

88 # can't write to a remote url 

89 # without making use of fsspec at the moment 

90 raise ValueError("storage_options passed with buffer, or non-supported URL") 

91 

92 handles = None 

93 if ( 

94 not fs 

95 and not is_dir 

96 and isinstance(path_or_handle, str) 

97 and not os.path.isdir(path_or_handle) 

98 ): 

99 # use get_handle only when we are very certain that it is not a directory 

100 # fsspec resources can also point to directories 

101 # this branch is used for example when reading from non-fsspec URLs 

102 handles = get_handle( 

103 path_or_handle, mode, is_text=False, storage_options=storage_options 

104 ) 

105 fs = None 

106 path_or_handle = handles.handle 

107 return path_or_handle, handles, fs 

108 

109 

110class BaseImpl: 

111 @staticmethod 

112 def validate_dataframe(df: DataFrame) -> None: 

113 

114 if not isinstance(df, DataFrame): 

115 raise ValueError("to_parquet only supports IO with DataFrames") 

116 

117 # must have value column names for all index levels (strings only) 

118 if isinstance(df.columns, MultiIndex): 

119 if not all( 

120 x.inferred_type in {"string", "empty"} for x in df.columns.levels 

121 ): 

122 raise ValueError( 

123 """ 

124 parquet must have string column names for all values in 

125 each level of the MultiIndex 

126 """ 

127 ) 

128 else: 

129 if df.columns.inferred_type not in {"string", "empty"}: 

130 raise ValueError("parquet must have string column names") 

131 

132 # index level names must be strings 

133 valid_names = all( 

134 isinstance(name, str) for name in df.index.names if name is not None 

135 ) 

136 if not valid_names: 

137 raise ValueError("Index level names must be strings") 

138 

139 def write(self, df: DataFrame, path, compression, **kwargs): 

140 raise AbstractMethodError(self) 

141 

142 def read(self, path, columns=None, **kwargs) -> DataFrame: 

143 raise AbstractMethodError(self) 

144 

145 

146class PyArrowImpl(BaseImpl): 

147 def __init__(self) -> None: 

148 import_optional_dependency( 

149 "pyarrow", extra="pyarrow is required for parquet support." 

150 ) 

151 import pyarrow.parquet 

152 

153 # import utils to register the pyarrow extension types 

154 import pandas.core.arrays.arrow.extension_types # pyright: ignore # noqa:F401 

155 

156 self.api = pyarrow 

157 

158 def write( 

159 self, 

160 df: DataFrame, 

161 path: FilePath | WriteBuffer[bytes], 

162 compression: str | None = "snappy", 

163 index: bool | None = None, 

164 storage_options: StorageOptions = None, 

165 partition_cols: list[str] | None = None, 

166 **kwargs, 

167 ) -> None: 

168 self.validate_dataframe(df) 

169 

170 from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)} 

171 if index is not None: 

172 from_pandas_kwargs["preserve_index"] = index 

173 

174 table = self.api.Table.from_pandas(df, **from_pandas_kwargs) 

175 

176 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( 

177 path, 

178 kwargs.pop("filesystem", None), 

179 storage_options=storage_options, 

180 mode="wb", 

181 is_dir=partition_cols is not None, 

182 ) 

183 if ( 

184 isinstance(path_or_handle, io.BufferedWriter) 

185 and hasattr(path_or_handle, "name") 

186 and isinstance(path_or_handle.name, (str, bytes)) 

187 ): 

188 path_or_handle = path_or_handle.name 

189 if isinstance(path_or_handle, bytes): 

190 path_or_handle = path_or_handle.decode() 

191 

192 try: 

193 if partition_cols is not None: 

194 # writes to multiple files under the given path 

195 self.api.parquet.write_to_dataset( 

196 table, 

197 path_or_handle, 

198 compression=compression, 

199 partition_cols=partition_cols, 

200 **kwargs, 

201 ) 

202 else: 

203 # write to single output file 

204 self.api.parquet.write_table( 

205 table, path_or_handle, compression=compression, **kwargs 

206 ) 

207 finally: 

208 if handles is not None: 

209 handles.close() 

210 

211 def read( 

212 self, 

213 path, 

214 columns=None, 

215 use_nullable_dtypes=False, 

216 storage_options: StorageOptions = None, 

217 **kwargs, 

218 ) -> DataFrame: 

219 kwargs["use_pandas_metadata"] = True 

220 

221 to_pandas_kwargs = {} 

222 if use_nullable_dtypes: 

223 import pandas as pd 

224 

225 mapping = { 

226 self.api.int8(): pd.Int8Dtype(), 

227 self.api.int16(): pd.Int16Dtype(), 

228 self.api.int32(): pd.Int32Dtype(), 

229 self.api.int64(): pd.Int64Dtype(), 

230 self.api.uint8(): pd.UInt8Dtype(), 

231 self.api.uint16(): pd.UInt16Dtype(), 

232 self.api.uint32(): pd.UInt32Dtype(), 

233 self.api.uint64(): pd.UInt64Dtype(), 

234 self.api.bool_(): pd.BooleanDtype(), 

235 self.api.string(): pd.StringDtype(), 

236 self.api.float32(): pd.Float32Dtype(), 

237 self.api.float64(): pd.Float64Dtype(), 

238 } 

239 to_pandas_kwargs["types_mapper"] = mapping.get 

240 manager = get_option("mode.data_manager") 

241 if manager == "array": 

242 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment] 

243 

244 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle( 

245 path, 

246 kwargs.pop("filesystem", None), 

247 storage_options=storage_options, 

248 mode="rb", 

249 ) 

250 try: 

251 result = self.api.parquet.read_table( 

252 path_or_handle, columns=columns, **kwargs 

253 ).to_pandas(**to_pandas_kwargs) 

254 if manager == "array": 

255 result = result._as_manager("array", copy=False) 

256 return result 

257 finally: 

258 if handles is not None: 

259 handles.close() 

260 

261 

262class FastParquetImpl(BaseImpl): 

263 def __init__(self) -> None: 

264 # since pandas is a dependency of fastparquet 

265 # we need to import on first use 

266 fastparquet = import_optional_dependency( 

267 "fastparquet", extra="fastparquet is required for parquet support." 

268 ) 

269 self.api = fastparquet 

270 

271 def write( 

272 self, 

273 df: DataFrame, 

274 path, 

275 compression="snappy", 

276 index=None, 

277 partition_cols=None, 

278 storage_options: StorageOptions = None, 

279 **kwargs, 

280 ) -> None: 

281 self.validate_dataframe(df) 

282 # thriftpy/protocol/compact.py:339: 

283 # DeprecationWarning: tostring() is deprecated. 

284 # Use tobytes() instead. 

285 

286 if "partition_on" in kwargs and partition_cols is not None: 

287 raise ValueError( 

288 "Cannot use both partition_on and " 

289 "partition_cols. Use partition_cols for partitioning data" 

290 ) 

291 elif "partition_on" in kwargs: 

292 partition_cols = kwargs.pop("partition_on") 

293 

294 if partition_cols is not None: 

295 kwargs["file_scheme"] = "hive" 

296 

297 # cannot use get_handle as write() does not accept file buffers 

298 path = stringify_path(path) 

299 if is_fsspec_url(path): 

300 fsspec = import_optional_dependency("fsspec") 

301 

302 # if filesystem is provided by fsspec, file must be opened in 'wb' mode. 

303 kwargs["open_with"] = lambda path, _: fsspec.open( 

304 path, "wb", **(storage_options or {}) 

305 ).open() 

306 elif storage_options: 

307 raise ValueError( 

308 "storage_options passed with file object or non-fsspec file path" 

309 ) 

310 

311 with catch_warnings(record=True): 

312 self.api.write( 

313 path, 

314 df, 

315 compression=compression, 

316 write_index=index, 

317 partition_on=partition_cols, 

318 **kwargs, 

319 ) 

320 

321 def read( 

322 self, path, columns=None, storage_options: StorageOptions = None, **kwargs 

323 ) -> DataFrame: 

324 parquet_kwargs: dict[str, Any] = {} 

325 use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False) 

326 if Version(self.api.__version__) >= Version("0.7.1"): 

327 # We are disabling nullable dtypes for fastparquet pending discussion 

328 parquet_kwargs["pandas_nulls"] = False 

329 if use_nullable_dtypes: 

330 raise ValueError( 

331 "The 'use_nullable_dtypes' argument is not supported for the " 

332 "fastparquet engine" 

333 ) 

334 path = stringify_path(path) 

335 handles = None 

336 if is_fsspec_url(path): 

337 fsspec = import_optional_dependency("fsspec") 

338 

339 if Version(self.api.__version__) > Version("0.6.1"): 

340 parquet_kwargs["fs"] = fsspec.open( 

341 path, "rb", **(storage_options or {}) 

342 ).fs 

343 else: 

344 parquet_kwargs["open_with"] = lambda path, _: fsspec.open( 

345 path, "rb", **(storage_options or {}) 

346 ).open() 

347 elif isinstance(path, str) and not os.path.isdir(path): 

348 # use get_handle only when we are very certain that it is not a directory 

349 # fsspec resources can also point to directories 

350 # this branch is used for example when reading from non-fsspec URLs 

351 handles = get_handle( 

352 path, "rb", is_text=False, storage_options=storage_options 

353 ) 

354 path = handles.handle 

355 

356 try: 

357 parquet_file = self.api.ParquetFile(path, **parquet_kwargs) 

358 return parquet_file.to_pandas(columns=columns, **kwargs) 

359 finally: 

360 if handles is not None: 

361 handles.close() 

362 

363 

364@doc(storage_options=_shared_docs["storage_options"]) 

365def to_parquet( 

366 df: DataFrame, 

367 path: FilePath | WriteBuffer[bytes] | None = None, 

368 engine: str = "auto", 

369 compression: str | None = "snappy", 

370 index: bool | None = None, 

371 storage_options: StorageOptions = None, 

372 partition_cols: list[str] | None = None, 

373 **kwargs, 

374) -> bytes | None: 

375 """ 

376 Write a DataFrame to the parquet format. 

377 

378 Parameters 

379 ---------- 

380 df : DataFrame 

381 path : str, path object, file-like object, or None, default None 

382 String, path object (implementing ``os.PathLike[str]``), or file-like 

383 object implementing a binary ``write()`` function. If None, the result is 

384 returned as bytes. If a string, it will be used as Root Directory path 

385 when writing a partitioned dataset. The engine fastparquet does not 

386 accept file-like objects. 

387 

388 .. versionchanged:: 1.2.0 

389 

390 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto' 

391 Parquet library to use. If 'auto', then the option 

392 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 

393 behavior is to try 'pyarrow', falling back to 'fastparquet' if 

394 'pyarrow' is unavailable. 

395 compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}}, 

396 default 'snappy'. Name of the compression to use. Use ``None`` 

397 for no compression. The supported compression methods actually 

398 depend on which engine is used. For 'pyarrow', 'snappy', 'gzip', 

399 'brotli', 'lz4', 'zstd' are all supported. For 'fastparquet', 

400 only 'gzip' and 'snappy' are supported. 

401 index : bool, default None 

402 If ``True``, include the dataframe's index(es) in the file output. If 

403 ``False``, they will not be written to the file. 

404 If ``None``, similar to ``True`` the dataframe's index(es) 

405 will be saved. However, instead of being saved as values, 

406 the RangeIndex will be stored as a range in the metadata so it 

407 doesn't require much space and is faster. Other indexes will 

408 be included as columns in the file output. 

409 partition_cols : str or list, optional, default None 

410 Column names by which to partition the dataset. 

411 Columns are partitioned in the order they are given. 

412 Must be None if path is not a string. 

413 {storage_options} 

414 

415 .. versionadded:: 1.2.0 

416 

417 kwargs 

418 Additional keyword arguments passed to the engine 

419 

420 Returns 

421 ------- 

422 bytes if no path argument is provided else None 

423 """ 

424 if isinstance(partition_cols, str): 

425 partition_cols = [partition_cols] 

426 impl = get_engine(engine) 

427 

428 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path 

429 

430 impl.write( 

431 df, 

432 path_or_buf, 

433 compression=compression, 

434 index=index, 

435 partition_cols=partition_cols, 

436 storage_options=storage_options, 

437 **kwargs, 

438 ) 

439 

440 if path is None: 

441 assert isinstance(path_or_buf, io.BytesIO) 

442 return path_or_buf.getvalue() 

443 else: 

444 return None 

445 

446 

447@doc(storage_options=_shared_docs["storage_options"]) 

448def read_parquet( 

449 path: FilePath | ReadBuffer[bytes], 

450 engine: str = "auto", 

451 columns: list[str] | None = None, 

452 storage_options: StorageOptions = None, 

453 use_nullable_dtypes: bool = False, 

454 **kwargs, 

455) -> DataFrame: 

456 """ 

457 Load a parquet object from the file path, returning a DataFrame. 

458 

459 Parameters 

460 ---------- 

461 path : str, path object or file-like object 

462 String, path object (implementing ``os.PathLike[str]``), or file-like 

463 object implementing a binary ``read()`` function. 

464 The string could be a URL. Valid URL schemes include http, ftp, s3, 

465 gs, and file. For file URLs, a host is expected. A local file could be: 

466 ``file://localhost/path/to/table.parquet``. 

467 A file URL can also be a path to a directory that contains multiple 

468 partitioned parquet files. Both pyarrow and fastparquet support 

469 paths to directories as well as file URLs. A directory path could be: 

470 ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``. 

471 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto' 

472 Parquet library to use. If 'auto', then the option 

473 ``io.parquet.engine`` is used. The default ``io.parquet.engine`` 

474 behavior is to try 'pyarrow', falling back to 'fastparquet' if 

475 'pyarrow' is unavailable. 

476 columns : list, default=None 

477 If not None, only these columns will be read from the file. 

478 

479 {storage_options} 

480 

481 .. versionadded:: 1.3.0 

482 

483 use_nullable_dtypes : bool, default False 

484 If True, use dtypes that use ``pd.NA`` as missing value indicator 

485 for the resulting DataFrame. (only applicable for the ``pyarrow`` 

486 engine) 

487 As new dtypes are added that support ``pd.NA`` in the future, the 

488 output with this option will change to use those dtypes. 

489 Note: this is an experimental option, and behaviour (e.g. additional 

490 support dtypes) may change without notice. 

491 

492 .. versionadded:: 1.2.0 

493 

494 **kwargs 

495 Any additional kwargs are passed to the engine. 

496 

497 Returns 

498 ------- 

499 DataFrame 

500 """ 

501 impl = get_engine(engine) 

502 

503 return impl.read( 

504 path, 

505 columns=columns, 

506 storage_options=storage_options, 

507 use_nullable_dtypes=use_nullable_dtypes, 

508 **kwargs, 

509 )