Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/parsers/arrow_parser_wrapper.py: 16%
65 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3from typing import TYPE_CHECKING
5from pandas._typing import ReadBuffer
6from pandas.compat._optional import import_optional_dependency
8from pandas.core.dtypes.inference import is_integer
10from pandas.io.parsers.base_parser import ParserBase
12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13, because the condition on line 12 was never true
13 from pandas import DataFrame
16class ArrowParserWrapper(ParserBase):
17 """
18 Wrapper for the pyarrow engine for read_csv()
19 """
21 def __init__(self, src: ReadBuffer[bytes], **kwds) -> None:
22 super().__init__(kwds)
23 self.kwds = kwds
24 self.src = src
26 self._parse_kwds()
28 def _parse_kwds(self):
29 """
30 Validates keywords before passing to pyarrow.
31 """
32 encoding: str | None = self.kwds.get("encoding")
33 self.encoding = "utf-8" if encoding is None else encoding
35 self.usecols, self.usecols_dtype = self._validate_usecols_arg(
36 self.kwds["usecols"]
37 )
38 na_values = self.kwds["na_values"]
39 if isinstance(na_values, dict):
40 raise ValueError(
41 "The pyarrow engine doesn't support passing a dict for na_values"
42 )
43 self.na_values = list(self.kwds["na_values"])
45 def _get_pyarrow_options(self):
46 """
47 Rename some arguments to pass to pyarrow
48 """
49 mapping = {
50 "usecols": "include_columns",
51 "na_values": "null_values",
52 "escapechar": "escape_char",
53 "skip_blank_lines": "ignore_empty_lines",
54 }
55 for pandas_name, pyarrow_name in mapping.items():
56 if pandas_name in self.kwds and self.kwds.get(pandas_name) is not None:
57 self.kwds[pyarrow_name] = self.kwds.pop(pandas_name)
59 self.parse_options = {
60 option_name: option_value
61 for option_name, option_value in self.kwds.items()
62 if option_value is not None
63 and option_name
64 in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines")
65 }
66 self.convert_options = {
67 option_name: option_value
68 for option_name, option_value in self.kwds.items()
69 if option_value is not None
70 and option_name
71 in ("include_columns", "null_values", "true_values", "false_values")
72 }
73 self.read_options = {
74 "autogenerate_column_names": self.header is None,
75 "skip_rows": self.header
76 if self.header is not None
77 else self.kwds["skiprows"],
78 }
80 def _finalize_output(self, frame: DataFrame) -> DataFrame:
81 """
82 Processes data read in based on kwargs.
84 Parameters
85 ----------
86 frame: DataFrame
87 The DataFrame to process.
89 Returns
90 -------
91 DataFrame
92 The processed DataFrame.
93 """
94 num_cols = len(frame.columns)
95 multi_index_named = True
96 if self.header is None:
97 if self.names is None:
98 if self.prefix is not None:
99 self.names = [f"{self.prefix}{i}" for i in range(num_cols)]
100 elif self.header is None:
101 self.names = range(num_cols)
102 if len(self.names) != num_cols:
103 # usecols is passed through to pyarrow, we only handle index col here
104 # The only way self.names is not the same length as number of cols is
105 # if we have int index_col. We should just pad the names(they will get
106 # removed anyways) to expected length then.
107 self.names = list(range(num_cols - len(self.names))) + self.names
108 multi_index_named = False
109 frame.columns = self.names
110 # we only need the frame not the names
111 frame.columns, frame = self._do_date_conversions(frame.columns, frame)
112 if self.index_col is not None:
113 for i, item in enumerate(self.index_col):
114 if is_integer(item):
115 self.index_col[i] = frame.columns[item]
116 else:
117 # String case
118 if item not in frame.columns:
119 raise ValueError(f"Index {item} invalid")
120 frame.set_index(self.index_col, drop=True, inplace=True)
121 # Clear names if headerless and no name given
122 if self.header is None and not multi_index_named:
123 frame.index.names = [None] * len(frame.index.names)
125 if self.kwds.get("dtype") is not None:
126 try:
127 frame = frame.astype(self.kwds.get("dtype"))
128 except TypeError as e:
129 # GH#44901 reraise to keep api consistent
130 raise ValueError(e)
131 return frame
133 def read(self) -> DataFrame:
134 """
135 Reads the contents of a CSV file into a DataFrame and
136 processes it according to the kwargs passed in the
137 constructor.
139 Returns
140 -------
141 DataFrame
142 The DataFrame created from the CSV file.
143 """
144 pyarrow_csv = import_optional_dependency("pyarrow.csv")
145 self._get_pyarrow_options()
147 table = pyarrow_csv.read_csv(
148 self.src,
149 read_options=pyarrow_csv.ReadOptions(**self.read_options),
150 parse_options=pyarrow_csv.ParseOptions(**self.parse_options),
151 convert_options=pyarrow_csv.ConvertOptions(**self.convert_options),
152 )
154 frame = table.to_pandas()
155 return self._finalize_output(frame)