Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/parquet.py: 16%
158 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1""" parquet compat """
2from __future__ import annotations
4import io
5import os
6from typing import Any
7from warnings import catch_warnings
9from pandas._typing import (
10 FilePath,
11 ReadBuffer,
12 StorageOptions,
13 WriteBuffer,
14)
15from pandas.compat._optional import import_optional_dependency
16from pandas.errors import AbstractMethodError
17from pandas.util._decorators import doc
19from pandas import (
20 DataFrame,
21 MultiIndex,
22 get_option,
23)
24from pandas.core.shared_docs import _shared_docs
25from pandas.util.version import Version
27from pandas.io.common import (
28 IOHandles,
29 get_handle,
30 is_fsspec_url,
31 is_url,
32 stringify_path,
33)
36def get_engine(engine: str) -> BaseImpl:
37 """return our implementation"""
38 if engine == "auto":
39 engine = get_option("io.parquet.engine")
41 if engine == "auto":
42 # try engines in this order
43 engine_classes = [PyArrowImpl, FastParquetImpl]
45 error_msgs = ""
46 for engine_class in engine_classes:
47 try:
48 return engine_class()
49 except ImportError as err:
50 error_msgs += "\n - " + str(err)
52 raise ImportError(
53 "Unable to find a usable engine; "
54 "tried using: 'pyarrow', 'fastparquet'.\n"
55 "A suitable version of "
56 "pyarrow or fastparquet is required for parquet "
57 "support.\n"
58 "Trying to import the above resulted in these errors:"
59 f"{error_msgs}"
60 )
62 if engine == "pyarrow":
63 return PyArrowImpl()
64 elif engine == "fastparquet":
65 return FastParquetImpl()
67 raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")
70def _get_path_or_handle(
71 path: FilePath | ReadBuffer[bytes] | WriteBuffer[bytes],
72 fs: Any,
73 storage_options: StorageOptions = None,
74 mode: str = "rb",
75 is_dir: bool = False,
76) -> tuple[
77 FilePath | ReadBuffer[bytes] | WriteBuffer[bytes], IOHandles[bytes] | None, Any
78]:
79 """File handling for PyArrow."""
80 path_or_handle = stringify_path(path)
81 if is_fsspec_url(path_or_handle) and fs is None:
82 fsspec = import_optional_dependency("fsspec")
84 fs, path_or_handle = fsspec.core.url_to_fs(
85 path_or_handle, **(storage_options or {})
86 )
87 elif storage_options and (not is_url(path_or_handle) or mode != "rb"):
88 # can't write to a remote url
89 # without making use of fsspec at the moment
90 raise ValueError("storage_options passed with buffer, or non-supported URL")
92 handles = None
93 if (
94 not fs
95 and not is_dir
96 and isinstance(path_or_handle, str)
97 and not os.path.isdir(path_or_handle)
98 ):
99 # use get_handle only when we are very certain that it is not a directory
100 # fsspec resources can also point to directories
101 # this branch is used for example when reading from non-fsspec URLs
102 handles = get_handle(
103 path_or_handle, mode, is_text=False, storage_options=storage_options
104 )
105 fs = None
106 path_or_handle = handles.handle
107 return path_or_handle, handles, fs
110class BaseImpl:
111 @staticmethod
112 def validate_dataframe(df: DataFrame) -> None:
114 if not isinstance(df, DataFrame):
115 raise ValueError("to_parquet only supports IO with DataFrames")
117 # must have value column names for all index levels (strings only)
118 if isinstance(df.columns, MultiIndex):
119 if not all(
120 x.inferred_type in {"string", "empty"} for x in df.columns.levels
121 ):
122 raise ValueError(
123 """
124 parquet must have string column names for all values in
125 each level of the MultiIndex
126 """
127 )
128 else:
129 if df.columns.inferred_type not in {"string", "empty"}:
130 raise ValueError("parquet must have string column names")
132 # index level names must be strings
133 valid_names = all(
134 isinstance(name, str) for name in df.index.names if name is not None
135 )
136 if not valid_names:
137 raise ValueError("Index level names must be strings")
139 def write(self, df: DataFrame, path, compression, **kwargs):
140 raise AbstractMethodError(self)
142 def read(self, path, columns=None, **kwargs) -> DataFrame:
143 raise AbstractMethodError(self)
146class PyArrowImpl(BaseImpl):
147 def __init__(self) -> None:
148 import_optional_dependency(
149 "pyarrow", extra="pyarrow is required for parquet support."
150 )
151 import pyarrow.parquet
153 # import utils to register the pyarrow extension types
154 import pandas.core.arrays.arrow.extension_types # pyright: ignore # noqa:F401
156 self.api = pyarrow
158 def write(
159 self,
160 df: DataFrame,
161 path: FilePath | WriteBuffer[bytes],
162 compression: str | None = "snappy",
163 index: bool | None = None,
164 storage_options: StorageOptions = None,
165 partition_cols: list[str] | None = None,
166 **kwargs,
167 ) -> None:
168 self.validate_dataframe(df)
170 from_pandas_kwargs: dict[str, Any] = {"schema": kwargs.pop("schema", None)}
171 if index is not None:
172 from_pandas_kwargs["preserve_index"] = index
174 table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
176 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
177 path,
178 kwargs.pop("filesystem", None),
179 storage_options=storage_options,
180 mode="wb",
181 is_dir=partition_cols is not None,
182 )
183 if (
184 isinstance(path_or_handle, io.BufferedWriter)
185 and hasattr(path_or_handle, "name")
186 and isinstance(path_or_handle.name, (str, bytes))
187 ):
188 path_or_handle = path_or_handle.name
189 if isinstance(path_or_handle, bytes):
190 path_or_handle = path_or_handle.decode()
192 try:
193 if partition_cols is not None:
194 # writes to multiple files under the given path
195 self.api.parquet.write_to_dataset(
196 table,
197 path_or_handle,
198 compression=compression,
199 partition_cols=partition_cols,
200 **kwargs,
201 )
202 else:
203 # write to single output file
204 self.api.parquet.write_table(
205 table, path_or_handle, compression=compression, **kwargs
206 )
207 finally:
208 if handles is not None:
209 handles.close()
211 def read(
212 self,
213 path,
214 columns=None,
215 use_nullable_dtypes=False,
216 storage_options: StorageOptions = None,
217 **kwargs,
218 ) -> DataFrame:
219 kwargs["use_pandas_metadata"] = True
221 to_pandas_kwargs = {}
222 if use_nullable_dtypes:
223 import pandas as pd
225 mapping = {
226 self.api.int8(): pd.Int8Dtype(),
227 self.api.int16(): pd.Int16Dtype(),
228 self.api.int32(): pd.Int32Dtype(),
229 self.api.int64(): pd.Int64Dtype(),
230 self.api.uint8(): pd.UInt8Dtype(),
231 self.api.uint16(): pd.UInt16Dtype(),
232 self.api.uint32(): pd.UInt32Dtype(),
233 self.api.uint64(): pd.UInt64Dtype(),
234 self.api.bool_(): pd.BooleanDtype(),
235 self.api.string(): pd.StringDtype(),
236 self.api.float32(): pd.Float32Dtype(),
237 self.api.float64(): pd.Float64Dtype(),
238 }
239 to_pandas_kwargs["types_mapper"] = mapping.get
240 manager = get_option("mode.data_manager")
241 if manager == "array":
242 to_pandas_kwargs["split_blocks"] = True # type: ignore[assignment]
244 path_or_handle, handles, kwargs["filesystem"] = _get_path_or_handle(
245 path,
246 kwargs.pop("filesystem", None),
247 storage_options=storage_options,
248 mode="rb",
249 )
250 try:
251 result = self.api.parquet.read_table(
252 path_or_handle, columns=columns, **kwargs
253 ).to_pandas(**to_pandas_kwargs)
254 if manager == "array":
255 result = result._as_manager("array", copy=False)
256 return result
257 finally:
258 if handles is not None:
259 handles.close()
262class FastParquetImpl(BaseImpl):
263 def __init__(self) -> None:
264 # since pandas is a dependency of fastparquet
265 # we need to import on first use
266 fastparquet = import_optional_dependency(
267 "fastparquet", extra="fastparquet is required for parquet support."
268 )
269 self.api = fastparquet
271 def write(
272 self,
273 df: DataFrame,
274 path,
275 compression="snappy",
276 index=None,
277 partition_cols=None,
278 storage_options: StorageOptions = None,
279 **kwargs,
280 ) -> None:
281 self.validate_dataframe(df)
282 # thriftpy/protocol/compact.py:339:
283 # DeprecationWarning: tostring() is deprecated.
284 # Use tobytes() instead.
286 if "partition_on" in kwargs and partition_cols is not None:
287 raise ValueError(
288 "Cannot use both partition_on and "
289 "partition_cols. Use partition_cols for partitioning data"
290 )
291 elif "partition_on" in kwargs:
292 partition_cols = kwargs.pop("partition_on")
294 if partition_cols is not None:
295 kwargs["file_scheme"] = "hive"
297 # cannot use get_handle as write() does not accept file buffers
298 path = stringify_path(path)
299 if is_fsspec_url(path):
300 fsspec = import_optional_dependency("fsspec")
302 # if filesystem is provided by fsspec, file must be opened in 'wb' mode.
303 kwargs["open_with"] = lambda path, _: fsspec.open(
304 path, "wb", **(storage_options or {})
305 ).open()
306 elif storage_options:
307 raise ValueError(
308 "storage_options passed with file object or non-fsspec file path"
309 )
311 with catch_warnings(record=True):
312 self.api.write(
313 path,
314 df,
315 compression=compression,
316 write_index=index,
317 partition_on=partition_cols,
318 **kwargs,
319 )
321 def read(
322 self, path, columns=None, storage_options: StorageOptions = None, **kwargs
323 ) -> DataFrame:
324 parquet_kwargs: dict[str, Any] = {}
325 use_nullable_dtypes = kwargs.pop("use_nullable_dtypes", False)
326 if Version(self.api.__version__) >= Version("0.7.1"):
327 # We are disabling nullable dtypes for fastparquet pending discussion
328 parquet_kwargs["pandas_nulls"] = False
329 if use_nullable_dtypes:
330 raise ValueError(
331 "The 'use_nullable_dtypes' argument is not supported for the "
332 "fastparquet engine"
333 )
334 path = stringify_path(path)
335 handles = None
336 if is_fsspec_url(path):
337 fsspec = import_optional_dependency("fsspec")
339 if Version(self.api.__version__) > Version("0.6.1"):
340 parquet_kwargs["fs"] = fsspec.open(
341 path, "rb", **(storage_options or {})
342 ).fs
343 else:
344 parquet_kwargs["open_with"] = lambda path, _: fsspec.open(
345 path, "rb", **(storage_options or {})
346 ).open()
347 elif isinstance(path, str) and not os.path.isdir(path):
348 # use get_handle only when we are very certain that it is not a directory
349 # fsspec resources can also point to directories
350 # this branch is used for example when reading from non-fsspec URLs
351 handles = get_handle(
352 path, "rb", is_text=False, storage_options=storage_options
353 )
354 path = handles.handle
356 try:
357 parquet_file = self.api.ParquetFile(path, **parquet_kwargs)
358 return parquet_file.to_pandas(columns=columns, **kwargs)
359 finally:
360 if handles is not None:
361 handles.close()
364@doc(storage_options=_shared_docs["storage_options"])
365def to_parquet(
366 df: DataFrame,
367 path: FilePath | WriteBuffer[bytes] | None = None,
368 engine: str = "auto",
369 compression: str | None = "snappy",
370 index: bool | None = None,
371 storage_options: StorageOptions = None,
372 partition_cols: list[str] | None = None,
373 **kwargs,
374) -> bytes | None:
375 """
376 Write a DataFrame to the parquet format.
378 Parameters
379 ----------
380 df : DataFrame
381 path : str, path object, file-like object, or None, default None
382 String, path object (implementing ``os.PathLike[str]``), or file-like
383 object implementing a binary ``write()`` function. If None, the result is
384 returned as bytes. If a string, it will be used as Root Directory path
385 when writing a partitioned dataset. The engine fastparquet does not
386 accept file-like objects.
388 .. versionchanged:: 1.2.0
390 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
391 Parquet library to use. If 'auto', then the option
392 ``io.parquet.engine`` is used. The default ``io.parquet.engine``
393 behavior is to try 'pyarrow', falling back to 'fastparquet' if
394 'pyarrow' is unavailable.
395 compression : {{'snappy', 'gzip', 'brotli', 'lz4', 'zstd', None}},
396 default 'snappy'. Name of the compression to use. Use ``None``
397 for no compression. The supported compression methods actually
398 depend on which engine is used. For 'pyarrow', 'snappy', 'gzip',
399 'brotli', 'lz4', 'zstd' are all supported. For 'fastparquet',
400 only 'gzip' and 'snappy' are supported.
401 index : bool, default None
402 If ``True``, include the dataframe's index(es) in the file output. If
403 ``False``, they will not be written to the file.
404 If ``None``, similar to ``True`` the dataframe's index(es)
405 will be saved. However, instead of being saved as values,
406 the RangeIndex will be stored as a range in the metadata so it
407 doesn't require much space and is faster. Other indexes will
408 be included as columns in the file output.
409 partition_cols : str or list, optional, default None
410 Column names by which to partition the dataset.
411 Columns are partitioned in the order they are given.
412 Must be None if path is not a string.
413 {storage_options}
415 .. versionadded:: 1.2.0
417 kwargs
418 Additional keyword arguments passed to the engine
420 Returns
421 -------
422 bytes if no path argument is provided else None
423 """
424 if isinstance(partition_cols, str):
425 partition_cols = [partition_cols]
426 impl = get_engine(engine)
428 path_or_buf: FilePath | WriteBuffer[bytes] = io.BytesIO() if path is None else path
430 impl.write(
431 df,
432 path_or_buf,
433 compression=compression,
434 index=index,
435 partition_cols=partition_cols,
436 storage_options=storage_options,
437 **kwargs,
438 )
440 if path is None:
441 assert isinstance(path_or_buf, io.BytesIO)
442 return path_or_buf.getvalue()
443 else:
444 return None
447@doc(storage_options=_shared_docs["storage_options"])
448def read_parquet(
449 path: FilePath | ReadBuffer[bytes],
450 engine: str = "auto",
451 columns: list[str] | None = None,
452 storage_options: StorageOptions = None,
453 use_nullable_dtypes: bool = False,
454 **kwargs,
455) -> DataFrame:
456 """
457 Load a parquet object from the file path, returning a DataFrame.
459 Parameters
460 ----------
461 path : str, path object or file-like object
462 String, path object (implementing ``os.PathLike[str]``), or file-like
463 object implementing a binary ``read()`` function.
464 The string could be a URL. Valid URL schemes include http, ftp, s3,
465 gs, and file. For file URLs, a host is expected. A local file could be:
466 ``file://localhost/path/to/table.parquet``.
467 A file URL can also be a path to a directory that contains multiple
468 partitioned parquet files. Both pyarrow and fastparquet support
469 paths to directories as well as file URLs. A directory path could be:
470 ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
471 engine : {{'auto', 'pyarrow', 'fastparquet'}}, default 'auto'
472 Parquet library to use. If 'auto', then the option
473 ``io.parquet.engine`` is used. The default ``io.parquet.engine``
474 behavior is to try 'pyarrow', falling back to 'fastparquet' if
475 'pyarrow' is unavailable.
476 columns : list, default=None
477 If not None, only these columns will be read from the file.
479 {storage_options}
481 .. versionadded:: 1.3.0
483 use_nullable_dtypes : bool, default False
484 If True, use dtypes that use ``pd.NA`` as missing value indicator
485 for the resulting DataFrame. (only applicable for the ``pyarrow``
486 engine)
487 As new dtypes are added that support ``pd.NA`` in the future, the
488 output with this option will change to use those dtypes.
489 Note: this is an experimental option, and behaviour (e.g. additional
490 support dtypes) may change without notice.
492 .. versionadded:: 1.2.0
494 **kwargs
495 Any additional kwargs are passed to the engine.
497 Returns
498 -------
499 DataFrame
500 """
501 impl = get_engine(engine)
503 return impl.read(
504 path,
505 columns=columns,
506 storage_options=storage_options,
507 use_nullable_dtypes=use_nullable_dtypes,
508 **kwargs,
509 )