Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/parsers/c_parser_wrapper.py: 11%
177 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3from collections import defaultdict
4from typing import (
5 TYPE_CHECKING,
6 Hashable,
7 Mapping,
8 Sequence,
9)
10import warnings
12import numpy as np
14import pandas._libs.parsers as parsers
15from pandas._typing import (
16 ArrayLike,
17 DtypeArg,
18 DtypeObj,
19 ReadCsvBuffer,
20)
21from pandas.errors import DtypeWarning
22from pandas.util._exceptions import find_stack_level
24from pandas.core.dtypes.common import (
25 is_categorical_dtype,
26 pandas_dtype,
27)
28from pandas.core.dtypes.concat import union_categoricals
29from pandas.core.dtypes.dtypes import ExtensionDtype
31from pandas.core.indexes.api import ensure_index_from_sequences
33from pandas.io.parsers.base_parser import (
34 ParserBase,
35 is_index_col,
36)
38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true
39 from pandas import (
40 Index,
41 MultiIndex,
42 )
45class CParserWrapper(ParserBase):
46 low_memory: bool
47 _reader: parsers.TextReader
49 def __init__(self, src: ReadCsvBuffer[str], **kwds) -> None:
50 super().__init__(kwds)
51 self.kwds = kwds
52 kwds = kwds.copy()
54 self.low_memory = kwds.pop("low_memory", False)
56 # #2442
57 # error: Cannot determine type of 'index_col'
58 kwds["allow_leading_cols"] = (
59 self.index_col is not False # type: ignore[has-type]
60 )
62 # GH20529, validate usecol arg before TextReader
63 kwds["usecols"] = self.usecols
65 # Have to pass int, would break tests using TextReader directly otherwise :(
66 kwds["on_bad_lines"] = self.on_bad_lines.value
68 for key in (
69 "storage_options",
70 "encoding",
71 "memory_map",
72 "compression",
73 "error_bad_lines",
74 "warn_bad_lines",
75 ):
76 kwds.pop(key, None)
78 kwds["dtype"] = ensure_dtype_objs(kwds.get("dtype", None))
79 self._reader = parsers.TextReader(src, **kwds)
81 self.unnamed_cols = self._reader.unnamed_cols
83 # error: Cannot determine type of 'names'
84 passed_names = self.names is None # type: ignore[has-type]
86 if self._reader.header is None:
87 self.names = None
88 else:
89 # error: Cannot determine type of 'names'
90 # error: Cannot determine type of 'index_names'
91 (
92 self.names, # type: ignore[has-type]
93 self.index_names,
94 self.col_names,
95 passed_names,
96 ) = self._extract_multi_indexer_columns(
97 self._reader.header,
98 self.index_names, # type: ignore[has-type]
99 passed_names,
100 )
102 # error: Cannot determine type of 'names'
103 if self.names is None: # type: ignore[has-type]
104 if self.prefix:
105 # error: Cannot determine type of 'names'
106 self.names = [ # type: ignore[has-type]
107 f"{self.prefix}{i}" for i in range(self._reader.table_width)
108 ]
109 else:
110 # error: Cannot determine type of 'names'
111 self.names = list( # type: ignore[has-type]
112 range(self._reader.table_width)
113 )
115 # gh-9755
116 #
117 # need to set orig_names here first
118 # so that proper indexing can be done
119 # with _set_noconvert_columns
120 #
121 # once names has been filtered, we will
122 # then set orig_names again to names
123 # error: Cannot determine type of 'names'
124 self.orig_names = self.names[:] # type: ignore[has-type]
126 if self.usecols:
127 usecols = self._evaluate_usecols(self.usecols, self.orig_names)
129 # GH 14671
130 # assert for mypy, orig_names is List or None, None would error in issubset
131 assert self.orig_names is not None
132 if self.usecols_dtype == "string" and not set(usecols).issubset(
133 self.orig_names
134 ):
135 self._validate_usecols_names(usecols, self.orig_names)
137 # error: Cannot determine type of 'names'
138 if len(self.names) > len(usecols): # type: ignore[has-type]
139 # error: Cannot determine type of 'names'
140 self.names = [ # type: ignore[has-type]
141 n
142 # error: Cannot determine type of 'names'
143 for i, n in enumerate(self.names) # type: ignore[has-type]
144 if (i in usecols or n in usecols)
145 ]
147 # error: Cannot determine type of 'names'
148 if len(self.names) < len(usecols): # type: ignore[has-type]
149 # error: Cannot determine type of 'names'
150 self._validate_usecols_names(
151 usecols,
152 self.names, # type: ignore[has-type]
153 )
155 # error: Cannot determine type of 'names'
156 self._validate_parse_dates_presence(self.names) # type: ignore[has-type]
157 self._set_noconvert_columns()
159 # error: Cannot determine type of 'names'
160 self.orig_names = self.names # type: ignore[has-type]
162 if not self._has_complex_date_col:
163 # error: Cannot determine type of 'index_col'
164 if self._reader.leading_cols == 0 and is_index_col(
165 self.index_col # type: ignore[has-type]
166 ):
168 self._name_processed = True
169 (
170 index_names,
171 # error: Cannot determine type of 'names'
172 self.names, # type: ignore[has-type]
173 self.index_col,
174 ) = self._clean_index_names(
175 # error: Cannot determine type of 'names'
176 self.names, # type: ignore[has-type]
177 # error: Cannot determine type of 'index_col'
178 self.index_col, # type: ignore[has-type]
179 )
181 if self.index_names is None:
182 self.index_names = index_names
184 if self._reader.header is None and not passed_names:
185 assert self.index_names is not None
186 self.index_names = [None] * len(self.index_names)
188 self._implicit_index = self._reader.leading_cols > 0
190 def close(self) -> None:
191 # close handles opened by C parser
192 try:
193 self._reader.close()
194 except ValueError:
195 pass
197 def _set_noconvert_columns(self) -> None:
198 """
199 Set the columns that should not undergo dtype conversions.
201 Currently, any column that is involved with date parsing will not
202 undergo such conversions.
203 """
204 assert self.orig_names is not None
205 # error: Cannot determine type of 'names'
207 # much faster than using orig_names.index(x) xref GH#44106
208 names_dict = {x: i for i, x in enumerate(self.orig_names)}
209 col_indices = [names_dict[x] for x in self.names] # type: ignore[has-type]
210 # error: Cannot determine type of 'names'
211 noconvert_columns = self._set_noconvert_dtype_columns(
212 col_indices,
213 self.names, # type: ignore[has-type]
214 )
215 for col in noconvert_columns:
216 self._reader.set_noconvert(col)
218 def read(
219 self,
220 nrows: int | None = None,
221 ) -> tuple[
222 Index | MultiIndex | None,
223 Sequence[Hashable] | MultiIndex,
224 Mapping[Hashable, ArrayLike],
225 ]:
226 index: Index | MultiIndex | None
227 column_names: Sequence[Hashable] | MultiIndex
228 try:
229 if self.low_memory:
230 chunks = self._reader.read_low_memory(nrows)
231 # destructive to chunks
232 data = _concatenate_chunks(chunks)
234 else:
235 data = self._reader.read(nrows)
236 except StopIteration:
237 if self._first_chunk:
238 self._first_chunk = False
239 names = self._maybe_dedup_names(self.orig_names)
240 index, columns, col_dict = self._get_empty_meta(
241 names,
242 self.index_col,
243 self.index_names,
244 dtype=self.kwds.get("dtype"),
245 )
246 columns = self._maybe_make_multi_index_columns(columns, self.col_names)
248 if self.usecols is not None:
249 columns = self._filter_usecols(columns)
251 col_dict = {k: v for k, v in col_dict.items() if k in columns}
253 return index, columns, col_dict
255 else:
256 self.close()
257 raise
259 # Done with first read, next time raise StopIteration
260 self._first_chunk = False
262 # error: Cannot determine type of 'names'
263 names = self.names # type: ignore[has-type]
265 if self._reader.leading_cols:
266 if self._has_complex_date_col:
267 raise NotImplementedError("file structure not yet supported")
269 # implicit index, no index names
270 arrays = []
272 for i in range(self._reader.leading_cols):
273 if self.index_col is None:
274 values = data.pop(i)
275 else:
276 values = data.pop(self.index_col[i])
278 values = self._maybe_parse_dates(values, i, try_parse_dates=True)
279 arrays.append(values)
281 index = ensure_index_from_sequences(arrays)
283 if self.usecols is not None:
284 names = self._filter_usecols(names)
286 names = self._maybe_dedup_names(names)
288 # rename dict keys
289 data_tups = sorted(data.items())
290 data = {k: v for k, (i, v) in zip(names, data_tups)}
292 column_names, date_data = self._do_date_conversions(names, data)
294 # maybe create a mi on the columns
295 column_names = self._maybe_make_multi_index_columns(
296 column_names, self.col_names
297 )
299 else:
300 # rename dict keys
301 data_tups = sorted(data.items())
303 # ugh, mutation
305 # assert for mypy, orig_names is List or None, None would error in list(...)
306 assert self.orig_names is not None
307 names = list(self.orig_names)
308 names = self._maybe_dedup_names(names)
310 if self.usecols is not None:
311 names = self._filter_usecols(names)
313 # columns as list
314 alldata = [x[1] for x in data_tups]
315 if self.usecols is None:
316 self._check_data_length(names, alldata)
318 data = {k: v for k, (i, v) in zip(names, data_tups)}
320 names, date_data = self._do_date_conversions(names, data)
321 index, column_names = self._make_index(date_data, alldata, names)
323 return index, column_names, date_data
325 def _filter_usecols(self, names: Sequence[Hashable]) -> Sequence[Hashable]:
326 # hackish
327 usecols = self._evaluate_usecols(self.usecols, names)
328 if usecols is not None and len(names) != len(usecols):
329 names = [
330 name for i, name in enumerate(names) if i in usecols or name in usecols
331 ]
332 return names
334 def _get_index_names(self):
335 names = list(self._reader.header[0])
336 idx_names = None
338 if self._reader.leading_cols == 0 and self.index_col is not None:
339 (idx_names, names, self.index_col) = self._clean_index_names(
340 names, self.index_col
341 )
343 return names, idx_names
345 def _maybe_parse_dates(self, values, index: int, try_parse_dates: bool = True):
346 if try_parse_dates and self._should_parse_dates(index):
347 values = self._date_conv(values)
348 return values
351def _concatenate_chunks(chunks: list[dict[int, ArrayLike]]) -> dict:
352 """
353 Concatenate chunks of data read with low_memory=True.
355 The tricky part is handling Categoricals, where different chunks
356 may have different inferred categories.
357 """
358 names = list(chunks[0].keys())
359 warning_columns = []
361 result: dict = {}
362 for name in names:
363 arrs = [chunk.pop(name) for chunk in chunks]
364 # Check each arr for consistent types.
365 dtypes = {a.dtype for a in arrs}
366 # TODO: shouldn't we exclude all EA dtypes here?
367 numpy_dtypes = {x for x in dtypes if not is_categorical_dtype(x)}
368 if len(numpy_dtypes) > 1:
369 # error: Argument 1 to "find_common_type" has incompatible type
370 # "Set[Any]"; expected "Sequence[Union[dtype[Any], None, type,
371 # _SupportsDType, str, Union[Tuple[Any, int], Tuple[Any,
372 # Union[int, Sequence[int]]], List[Any], _DTypeDict, Tuple[Any, Any]]]]"
373 common_type = np.find_common_type(
374 numpy_dtypes, # type: ignore[arg-type]
375 [],
376 )
377 if common_type == np.dtype(object):
378 warning_columns.append(str(name))
380 dtype = dtypes.pop()
381 if is_categorical_dtype(dtype):
382 result[name] = union_categoricals(arrs, sort_categories=False)
383 else:
384 if isinstance(dtype, ExtensionDtype):
385 # TODO: concat_compat?
386 array_type = dtype.construct_array_type()
387 # error: Argument 1 to "_concat_same_type" of "ExtensionArray"
388 # has incompatible type "List[Union[ExtensionArray, ndarray]]";
389 # expected "Sequence[ExtensionArray]"
390 result[name] = array_type._concat_same_type(
391 arrs # type: ignore[arg-type]
392 )
393 else:
394 # error: Argument 1 to "concatenate" has incompatible
395 # type "List[Union[ExtensionArray, ndarray[Any, Any]]]"
396 # ; expected "Union[_SupportsArray[dtype[Any]],
397 # Sequence[_SupportsArray[dtype[Any]]],
398 # Sequence[Sequence[_SupportsArray[dtype[Any]]]],
399 # Sequence[Sequence[Sequence[_SupportsArray[dtype[Any]]]]]
400 # , Sequence[Sequence[Sequence[Sequence[
401 # _SupportsArray[dtype[Any]]]]]]]"
402 result[name] = np.concatenate(arrs) # type: ignore[arg-type]
404 if warning_columns:
405 warning_names = ",".join(warning_columns)
406 warning_message = " ".join(
407 [
408 f"Columns ({warning_names}) have mixed types. "
409 f"Specify dtype option on import or set low_memory=False."
410 ]
411 )
412 warnings.warn(warning_message, DtypeWarning, stacklevel=find_stack_level())
413 return result
416def ensure_dtype_objs(
417 dtype: DtypeArg | dict[Hashable, DtypeArg] | None
418) -> DtypeObj | dict[Hashable, DtypeObj] | None:
419 """
420 Ensure we have either None, a dtype object, or a dictionary mapping to
421 dtype objects.
422 """
423 if isinstance(dtype, defaultdict):
424 # "None" not callable [misc]
425 default_dtype = pandas_dtype(dtype.default_factory()) # type: ignore[misc]
426 dtype_converted: defaultdict = defaultdict(lambda: default_dtype)
427 for key in dtype.keys():
428 dtype_converted[key] = pandas_dtype(dtype[key])
429 return dtype_converted
430 elif isinstance(dtype, dict):
431 return {k: pandas_dtype(dtype[k]) for k in dtype}
432 elif dtype is not None:
433 return pandas_dtype(dtype)
434 return dtype