Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py: 6%
639 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3from collections import (
4 abc,
5 defaultdict,
6)
7import csv
8from io import StringIO
9import re
10import sys
11from typing import (
12 IO,
13 TYPE_CHECKING,
14 DefaultDict,
15 Hashable,
16 Iterator,
17 List,
18 Literal,
19 Mapping,
20 Sequence,
21 cast,
22)
23import warnings
25import numpy as np
27import pandas._libs.lib as lib
28from pandas._typing import (
29 ArrayLike,
30 ReadCsvBuffer,
31 Scalar,
32)
33from pandas.errors import (
34 EmptyDataError,
35 ParserError,
36)
37from pandas.util._exceptions import find_stack_level
39from pandas.core.dtypes.common import is_integer
40from pandas.core.dtypes.inference import is_dict_like
42from pandas.io.parsers.base_parser import (
43 ParserBase,
44 parser_defaults,
45)
47if TYPE_CHECKING: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true
48 from pandas import (
49 Index,
50 MultiIndex,
51 )
53# BOM character (byte order mark)
54# This exists at the beginning of a file to indicate endianness
55# of a file (stream). Unfortunately, this marker screws up parsing,
56# so we need to remove it if we see it.
57_BOM = "\ufeff"
60class PythonParser(ParserBase):
61 def __init__(self, f: ReadCsvBuffer[str] | list, **kwds) -> None:
62 """
63 Workhorse function for processing nested list into DataFrame
64 """
65 super().__init__(kwds)
67 self.data: Iterator[str] | None = None
68 self.buf: list = []
69 self.pos = 0
70 self.line_pos = 0
72 self.skiprows = kwds["skiprows"]
74 if callable(self.skiprows):
75 self.skipfunc = self.skiprows
76 else:
77 self.skipfunc = lambda x: x in self.skiprows
79 self.skipfooter = _validate_skipfooter_arg(kwds["skipfooter"])
80 self.delimiter = kwds["delimiter"]
82 self.quotechar = kwds["quotechar"]
83 if isinstance(self.quotechar, str):
84 self.quotechar = str(self.quotechar)
86 self.escapechar = kwds["escapechar"]
87 self.doublequote = kwds["doublequote"]
88 self.skipinitialspace = kwds["skipinitialspace"]
89 self.lineterminator = kwds["lineterminator"]
90 self.quoting = kwds["quoting"]
91 self.skip_blank_lines = kwds["skip_blank_lines"]
93 self.names_passed = kwds["names"] or None
95 self.has_index_names = False
96 if "has_index_names" in kwds:
97 self.has_index_names = kwds["has_index_names"]
99 self.verbose = kwds["verbose"]
101 self.thousands = kwds["thousands"]
102 self.decimal = kwds["decimal"]
104 self.comment = kwds["comment"]
106 # Set self.data to something that can read lines.
107 if isinstance(f, list):
108 # read_excel: f is a list
109 self.data = cast(Iterator[str], f)
110 else:
111 assert hasattr(f, "readline")
112 self._make_reader(f)
114 # Get columns in two steps: infer from data, then
115 # infer column indices from self.usecols if it is specified.
116 self._col_indices: list[int] | None = None
117 columns: list[list[Scalar | None]]
118 (
119 columns,
120 self.num_original_columns,
121 self.unnamed_cols,
122 ) = self._infer_columns()
124 # Now self.columns has the set of columns that we will process.
125 # The original set is stored in self.original_columns.
126 # error: Cannot determine type of 'index_names'
127 self.columns: list[Hashable]
128 (
129 self.columns,
130 self.index_names,
131 self.col_names,
132 _,
133 ) = self._extract_multi_indexer_columns(
134 columns,
135 self.index_names, # type: ignore[has-type]
136 )
138 # get popped off for index
139 self.orig_names: list[Hashable] = list(self.columns)
141 # needs to be cleaned/refactored
142 # multiple date column thing turning into a real spaghetti factory
144 if not self._has_complex_date_col:
145 (index_names, self.orig_names, self.columns) = self._get_index_name(
146 self.columns
147 )
148 self._name_processed = True
149 if self.index_names is None:
150 self.index_names = index_names
152 if self._col_indices is None:
153 self._col_indices = list(range(len(self.columns)))
155 self._parse_date_cols = self._validate_parse_dates_presence(self.columns)
156 no_thousands_columns: set[int] | None = None
157 if self.parse_dates:
158 no_thousands_columns = self._set_noconvert_dtype_columns(
159 self._col_indices, self.columns
160 )
161 self._no_thousands_columns = no_thousands_columns
163 if len(self.decimal) != 1:
164 raise ValueError("Only length-1 decimal markers supported")
166 decimal = re.escape(self.decimal)
167 if self.thousands is None:
168 regex = rf"^[\-\+]?[0-9]*({decimal}[0-9]*)?([0-9]?(E|e)\-?[0-9]+)?$"
169 else:
170 thousands = re.escape(self.thousands)
171 regex = (
172 rf"^[\-\+]?([0-9]+{thousands}|[0-9])*({decimal}[0-9]*)?"
173 rf"([0-9]?(E|e)\-?[0-9]+)?$"
174 )
175 self.num = re.compile(regex)
177 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> None:
178 sep = self.delimiter
180 if sep is None or len(sep) == 1:
181 if self.lineterminator:
182 raise ValueError(
183 "Custom line terminators not supported in python parser (yet)"
184 )
186 class MyDialect(csv.Dialect):
187 delimiter = self.delimiter
188 quotechar = self.quotechar
189 escapechar = self.escapechar
190 doublequote = self.doublequote
191 skipinitialspace = self.skipinitialspace
192 quoting = self.quoting
193 lineterminator = "\n"
195 dia = MyDialect
197 if sep is not None:
198 dia.delimiter = sep
199 else:
200 # attempt to sniff the delimiter from the first valid line,
201 # i.e. no comment line and not in skiprows
202 line = f.readline()
203 lines = self._check_comments([[line]])[0]
204 while self.skipfunc(self.pos) or not lines:
205 self.pos += 1
206 line = f.readline()
207 lines = self._check_comments([[line]])[0]
208 lines_str = cast(List[str], lines)
210 # since `line` was a string, lines will be a list containing
211 # only a single string
212 line = lines_str[0]
214 self.pos += 1
215 self.line_pos += 1
216 sniffed = csv.Sniffer().sniff(line)
217 dia.delimiter = sniffed.delimiter
219 # Note: encoding is irrelevant here
220 line_rdr = csv.reader(StringIO(line), dialect=dia)
221 self.buf.extend(list(line_rdr))
223 # Note: encoding is irrelevant here
224 reader = csv.reader(f, dialect=dia, strict=True)
226 else:
228 def _read():
229 line = f.readline()
230 pat = re.compile(sep)
232 yield pat.split(line.strip())
234 for line in f:
235 yield pat.split(line.strip())
237 reader = _read()
239 # error: Incompatible types in assignment (expression has type "_reader",
240 # variable has type "Union[IO[Any], RawIOBase, BufferedIOBase, TextIOBase,
241 # TextIOWrapper, mmap, None]")
242 self.data = reader # type: ignore[assignment]
244 def read(
245 self, rows: int | None = None
246 ) -> tuple[
247 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike]
248 ]:
249 try:
250 content = self._get_lines(rows)
251 except StopIteration:
252 if self._first_chunk:
253 content = []
254 else:
255 self.close()
256 raise
258 # done with first read, next time raise StopIteration
259 self._first_chunk = False
261 columns: Sequence[Hashable] = list(self.orig_names)
262 if not len(content): # pragma: no cover
263 # DataFrame with the right metadata, even though it's length 0
264 names = self._maybe_dedup_names(self.orig_names)
265 # error: Cannot determine type of 'index_col'
266 index, columns, col_dict = self._get_empty_meta(
267 names,
268 self.index_col, # type: ignore[has-type]
269 self.index_names,
270 self.dtype,
271 )
272 conv_columns = self._maybe_make_multi_index_columns(columns, self.col_names)
273 return index, conv_columns, col_dict
275 # handle new style for names in index
276 count_empty_content_vals = count_empty_vals(content[0])
277 indexnamerow = None
278 if self.has_index_names and count_empty_content_vals == len(columns):
279 indexnamerow = content[0]
280 content = content[1:]
282 alldata = self._rows_to_cols(content)
283 data, columns = self._exclude_implicit_index(alldata)
285 conv_data = self._convert_data(data)
286 columns, conv_data = self._do_date_conversions(columns, conv_data)
288 index, result_columns = self._make_index(
289 conv_data, alldata, columns, indexnamerow
290 )
292 return index, result_columns, conv_data
294 def _exclude_implicit_index(
295 self,
296 alldata: list[np.ndarray],
297 ) -> tuple[Mapping[Hashable, np.ndarray], Sequence[Hashable]]:
298 names = self._maybe_dedup_names(self.orig_names)
300 offset = 0
301 if self._implicit_index:
302 # error: Cannot determine type of 'index_col'
303 offset = len(self.index_col) # type: ignore[has-type]
305 len_alldata = len(alldata)
306 self._check_data_length(names, alldata)
308 return {
309 name: alldata[i + offset] for i, name in enumerate(names) if i < len_alldata
310 }, names
312 # legacy
313 def get_chunk(
314 self, size: int | None = None
315 ) -> tuple[
316 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike]
317 ]:
318 if size is None:
319 # error: "PythonParser" has no attribute "chunksize"
320 size = self.chunksize # type: ignore[attr-defined]
321 return self.read(rows=size)
323 def _convert_data(
324 self,
325 data: Mapping[Hashable, np.ndarray],
326 ) -> Mapping[Hashable, ArrayLike]:
327 # apply converters
328 clean_conv = self._clean_mapping(self.converters)
329 clean_dtypes = self._clean_mapping(self.dtype)
331 # Apply NA values.
332 clean_na_values = {}
333 clean_na_fvalues = {}
335 if isinstance(self.na_values, dict):
336 for col in self.na_values:
337 na_value = self.na_values[col]
338 na_fvalue = self.na_fvalues[col]
340 if isinstance(col, int) and col not in self.orig_names:
341 col = self.orig_names[col]
343 clean_na_values[col] = na_value
344 clean_na_fvalues[col] = na_fvalue
345 else:
346 clean_na_values = self.na_values
347 clean_na_fvalues = self.na_fvalues
349 return self._convert_to_ndarrays(
350 data,
351 clean_na_values,
352 clean_na_fvalues,
353 self.verbose,
354 clean_conv,
355 clean_dtypes,
356 )
358 def _infer_columns(
359 self,
360 ) -> tuple[list[list[Scalar | None]], int, set[Scalar | None]]:
361 names = self.names
362 num_original_columns = 0
363 clear_buffer = True
364 unnamed_cols: set[Scalar | None] = set()
365 self._header_line = None
367 if self.header is not None:
368 header = self.header
370 if isinstance(header, (list, tuple, np.ndarray)):
371 have_mi_columns = len(header) > 1
372 # we have a mi columns, so read an extra line
373 if have_mi_columns:
374 header = list(header) + [header[-1] + 1]
375 else:
376 have_mi_columns = False
377 header = [header]
379 columns: list[list[Scalar | None]] = []
380 for level, hr in enumerate(header):
381 try:
382 line = self._buffered_line()
384 while self.line_pos <= hr:
385 line = self._next_line()
387 except StopIteration as err:
388 if 0 < self.line_pos <= hr and (
389 not have_mi_columns or hr != header[-1]
390 ):
391 # If no rows we want to raise a different message and if
392 # we have mi columns, the last line is not part of the header
393 joi = list(map(str, header[:-1] if have_mi_columns else header))
394 msg = f"[{','.join(joi)}], len of {len(joi)}, "
395 raise ValueError(
396 f"Passed header={msg}"
397 f"but only {self.line_pos} lines in file"
398 ) from err
400 # We have an empty file, so check
401 # if columns are provided. That will
402 # serve as the 'line' for parsing
403 if have_mi_columns and hr > 0:
404 if clear_buffer:
405 self._clear_buffer()
406 columns.append([None] * len(columns[-1]))
407 return columns, num_original_columns, unnamed_cols
409 if not self.names:
410 raise EmptyDataError("No columns to parse from file") from err
412 line = self.names[:]
414 this_columns: list[Scalar | None] = []
415 this_unnamed_cols = []
417 for i, c in enumerate(line):
418 if c == "":
419 if have_mi_columns:
420 col_name = f"Unnamed: {i}_level_{level}"
421 else:
422 col_name = f"Unnamed: {i}"
424 this_unnamed_cols.append(i)
425 this_columns.append(col_name)
426 else:
427 this_columns.append(c)
429 if not have_mi_columns and self.mangle_dupe_cols:
430 counts: DefaultDict = defaultdict(int)
431 # Ensure that regular columns are used before unnamed ones
432 # to keep given names and mangle unnamed columns
433 col_loop_order = [
434 i
435 for i in range(len(this_columns))
436 if i not in this_unnamed_cols
437 ] + this_unnamed_cols
439 for i in col_loop_order:
440 col = this_columns[i]
441 old_col = col
442 cur_count = counts[col]
444 if cur_count > 0:
445 while cur_count > 0:
446 counts[old_col] = cur_count + 1
447 col = f"{old_col}.{cur_count}"
448 if col in this_columns:
449 cur_count += 1
450 else:
451 cur_count = counts[col]
453 if (
454 self.dtype is not None
455 and is_dict_like(self.dtype)
456 and self.dtype.get(old_col) is not None
457 and self.dtype.get(col) is None
458 ):
459 self.dtype.update({col: self.dtype.get(old_col)})
460 this_columns[i] = col
461 counts[col] = cur_count + 1
462 elif have_mi_columns:
464 # if we have grabbed an extra line, but its not in our
465 # format so save in the buffer, and create an blank extra
466 # line for the rest of the parsing code
467 if hr == header[-1]:
468 lc = len(this_columns)
469 # error: Cannot determine type of 'index_col'
470 sic = self.index_col # type: ignore[has-type]
471 ic = len(sic) if sic is not None else 0
472 unnamed_count = len(this_unnamed_cols)
474 # if wrong number of blanks or no index, not our format
475 if (lc != unnamed_count and lc - ic > unnamed_count) or ic == 0:
476 clear_buffer = False
477 this_columns = [None] * lc
478 self.buf = [self.buf[-1]]
480 columns.append(this_columns)
481 unnamed_cols.update({this_columns[i] for i in this_unnamed_cols})
483 if len(columns) == 1:
484 num_original_columns = len(this_columns)
486 if clear_buffer:
487 self._clear_buffer()
489 first_line: list[Scalar] | None
490 if names is not None:
491 # Read first row after header to check if data are longer
492 try:
493 first_line = self._next_line()
494 except StopIteration:
495 first_line = None
497 len_first_data_row = 0 if first_line is None else len(first_line)
499 if len(names) > len(columns[0]) and len(names) > len_first_data_row:
500 raise ValueError(
501 "Number of passed names did not match "
502 "number of header fields in the file"
503 )
504 if len(columns) > 1:
505 raise TypeError("Cannot pass names with multi-index columns")
507 if self.usecols is not None:
508 # Set _use_cols. We don't store columns because they are
509 # overwritten.
510 self._handle_usecols(columns, names, num_original_columns)
511 else:
512 num_original_columns = len(names)
513 if self._col_indices is not None and len(names) != len(
514 self._col_indices
515 ):
516 columns = [[names[i] for i in sorted(self._col_indices)]]
517 else:
518 columns = [names]
519 else:
520 columns = self._handle_usecols(
521 columns, columns[0], num_original_columns
522 )
523 else:
524 try:
525 line = self._buffered_line()
527 except StopIteration as err:
528 if not names:
529 raise EmptyDataError("No columns to parse from file") from err
531 line = names[:]
533 # Store line, otherwise it is lost for guessing the index
534 self._header_line = line
535 ncols = len(line)
536 num_original_columns = ncols
538 if not names:
539 if self.prefix:
540 columns = [[f"{self.prefix}{i}" for i in range(ncols)]]
541 else:
542 columns = [list(range(ncols))]
543 columns = self._handle_usecols(
544 columns, columns[0], num_original_columns
545 )
546 else:
547 if self.usecols is None or len(names) >= num_original_columns:
548 columns = self._handle_usecols([names], names, num_original_columns)
549 num_original_columns = len(names)
550 else:
551 if not callable(self.usecols) and len(names) != len(self.usecols):
552 raise ValueError(
553 "Number of passed names did not match number of "
554 "header fields in the file"
555 )
556 # Ignore output but set used columns.
557 self._handle_usecols([names], names, ncols)
558 columns = [names]
559 num_original_columns = ncols
561 return columns, num_original_columns, unnamed_cols
563 def _handle_usecols(
564 self,
565 columns: list[list[Scalar | None]],
566 usecols_key: list[Scalar | None],
567 num_original_columns: int,
568 ) -> list[list[Scalar | None]]:
569 """
570 Sets self._col_indices
572 usecols_key is used if there are string usecols.
573 """
574 col_indices: set[int] | list[int]
575 if self.usecols is not None:
576 if callable(self.usecols):
577 col_indices = self._evaluate_usecols(self.usecols, usecols_key)
578 elif any(isinstance(u, str) for u in self.usecols):
579 if len(columns) > 1:
580 raise ValueError(
581 "If using multiple headers, usecols must be integers."
582 )
583 col_indices = []
585 for col in self.usecols:
586 if isinstance(col, str):
587 try:
588 col_indices.append(usecols_key.index(col))
589 except ValueError:
590 self._validate_usecols_names(self.usecols, usecols_key)
591 else:
592 col_indices.append(col)
593 else:
594 missing_usecols = [
595 col for col in self.usecols if col >= num_original_columns
596 ]
597 if missing_usecols:
598 warnings.warn(
599 "Defining usecols with out of bounds indices is deprecated "
600 "and will raise a ParserError in a future version.",
601 FutureWarning,
602 stacklevel=find_stack_level(),
603 )
604 col_indices = self.usecols
606 columns = [
607 [n for i, n in enumerate(column) if i in col_indices]
608 for column in columns
609 ]
610 self._col_indices = sorted(col_indices)
611 return columns
613 def _buffered_line(self) -> list[Scalar]:
614 """
615 Return a line from buffer, filling buffer if required.
616 """
617 if len(self.buf) > 0:
618 return self.buf[0]
619 else:
620 return self._next_line()
622 def _check_for_bom(self, first_row: list[Scalar]) -> list[Scalar]:
623 """
624 Checks whether the file begins with the BOM character.
625 If it does, remove it. In addition, if there is quoting
626 in the field subsequent to the BOM, remove it as well
627 because it technically takes place at the beginning of
628 the name, not the middle of it.
629 """
630 # first_row will be a list, so we need to check
631 # that that list is not empty before proceeding.
632 if not first_row:
633 return first_row
635 # The first element of this row is the one that could have the
636 # BOM that we want to remove. Check that the first element is a
637 # string before proceeding.
638 if not isinstance(first_row[0], str):
639 return first_row
641 # Check that the string is not empty, as that would
642 # obviously not have a BOM at the start of it.
643 if not first_row[0]:
644 return first_row
646 # Since the string is non-empty, check that it does
647 # in fact begin with a BOM.
648 first_elt = first_row[0][0]
649 if first_elt != _BOM:
650 return first_row
652 first_row_bom = first_row[0]
653 new_row: str
655 if len(first_row_bom) > 1 and first_row_bom[1] == self.quotechar:
656 start = 2
657 quote = first_row_bom[1]
658 end = first_row_bom[2:].index(quote) + 2
660 # Extract the data between the quotation marks
661 new_row = first_row_bom[start:end]
663 # Extract any remaining data after the second
664 # quotation mark.
665 if len(first_row_bom) > end + 1:
666 new_row += first_row_bom[end + 1 :]
668 else:
670 # No quotation so just remove BOM from first element
671 new_row = first_row_bom[1:]
673 new_row_list: list[Scalar] = [new_row]
674 return new_row_list + first_row[1:]
676 def _is_line_empty(self, line: list[Scalar]) -> bool:
677 """
678 Check if a line is empty or not.
680 Parameters
681 ----------
682 line : str, array-like
683 The line of data to check.
685 Returns
686 -------
687 boolean : Whether or not the line is empty.
688 """
689 return not line or all(not x for x in line)
691 def _next_line(self) -> list[Scalar]:
692 if isinstance(self.data, list):
693 while self.skipfunc(self.pos):
694 if self.pos >= len(self.data):
695 break
696 self.pos += 1
698 while True:
699 try:
700 line = self._check_comments([self.data[self.pos]])[0]
701 self.pos += 1
702 # either uncommented or blank to begin with
703 if not self.skip_blank_lines and (
704 self._is_line_empty(self.data[self.pos - 1]) or line
705 ):
706 break
707 elif self.skip_blank_lines:
708 ret = self._remove_empty_lines([line])
709 if ret:
710 line = ret[0]
711 break
712 except IndexError:
713 raise StopIteration
714 else:
715 while self.skipfunc(self.pos):
716 self.pos += 1
717 # assert for mypy, data is Iterator[str] or None, would error in next
718 assert self.data is not None
719 next(self.data)
721 while True:
722 orig_line = self._next_iter_line(row_num=self.pos + 1)
723 self.pos += 1
725 if orig_line is not None:
726 line = self._check_comments([orig_line])[0]
728 if self.skip_blank_lines:
729 ret = self._remove_empty_lines([line])
731 if ret:
732 line = ret[0]
733 break
734 elif self._is_line_empty(orig_line) or line:
735 break
737 # This was the first line of the file,
738 # which could contain the BOM at the
739 # beginning of it.
740 if self.pos == 1:
741 line = self._check_for_bom(line)
743 self.line_pos += 1
744 self.buf.append(line)
745 return line
747 def _alert_malformed(self, msg: str, row_num: int) -> None:
748 """
749 Alert a user about a malformed row, depending on value of
750 `self.on_bad_lines` enum.
752 If `self.on_bad_lines` is ERROR, the alert will be `ParserError`.
753 If `self.on_bad_lines` is WARN, the alert will be printed out.
755 Parameters
756 ----------
757 msg: str
758 The error message to display.
759 row_num: int
760 The row number where the parsing error occurred.
761 Because this row number is displayed, we 1-index,
762 even though we 0-index internally.
763 """
764 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
765 raise ParserError(msg)
766 elif self.on_bad_lines == self.BadLineHandleMethod.WARN:
767 base = f"Skipping line {row_num}: "
768 sys.stderr.write(base + msg + "\n")
770 def _next_iter_line(self, row_num: int) -> list[Scalar] | None:
771 """
772 Wrapper around iterating through `self.data` (CSV source).
774 When a CSV error is raised, we check for specific
775 error messages that allow us to customize the
776 error message displayed to the user.
778 Parameters
779 ----------
780 row_num: int
781 The row number of the line being parsed.
782 """
783 try:
784 # assert for mypy, data is Iterator[str] or None, would error in next
785 assert self.data is not None
786 line = next(self.data)
787 # for mypy
788 assert isinstance(line, list)
789 return line
790 except csv.Error as e:
791 if (
792 self.on_bad_lines == self.BadLineHandleMethod.ERROR
793 or self.on_bad_lines == self.BadLineHandleMethod.WARN
794 ):
795 msg = str(e)
797 if "NULL byte" in msg or "line contains NUL" in msg:
798 msg = (
799 "NULL byte detected. This byte "
800 "cannot be processed in Python's "
801 "native csv library at the moment, "
802 "so please pass in engine='c' instead"
803 )
805 if self.skipfooter > 0:
806 reason = (
807 "Error could possibly be due to "
808 "parsing errors in the skipped footer rows "
809 "(the skipfooter keyword is only applied "
810 "after Python's csv library has parsed "
811 "all rows)."
812 )
813 msg += ". " + reason
815 self._alert_malformed(msg, row_num)
816 return None
818 def _check_comments(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
819 if self.comment is None:
820 return lines
821 ret = []
822 for line in lines:
823 rl = []
824 for x in line:
825 if (
826 not isinstance(x, str)
827 or self.comment not in x
828 or x in self.na_values
829 ):
830 rl.append(x)
831 else:
832 x = x[: x.find(self.comment)]
833 if len(x) > 0:
834 rl.append(x)
835 break
836 ret.append(rl)
837 return ret
839 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
840 """
841 Iterate through the lines and remove any that are
842 either empty or contain only one whitespace value
844 Parameters
845 ----------
846 lines : list of list of Scalars
847 The array of lines that we are to filter.
849 Returns
850 -------
851 filtered_lines : list of list of Scalars
852 The same array of lines with the "empty" ones removed.
853 """
854 ret = []
855 for line in lines:
856 # Remove empty lines and lines with only one whitespace value
857 if (
858 len(line) > 1
859 or len(line) == 1
860 and (not isinstance(line[0], str) or line[0].strip())
861 ):
862 ret.append(line)
863 return ret
865 def _check_thousands(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
866 if self.thousands is None:
867 return lines
869 return self._search_replace_num_columns(
870 lines=lines, search=self.thousands, replace=""
871 )
873 def _search_replace_num_columns(
874 self, lines: list[list[Scalar]], search: str, replace: str
875 ) -> list[list[Scalar]]:
876 ret = []
877 for line in lines:
878 rl = []
879 for i, x in enumerate(line):
880 if (
881 not isinstance(x, str)
882 or search not in x
883 or (self._no_thousands_columns and i in self._no_thousands_columns)
884 or not self.num.search(x.strip())
885 ):
886 rl.append(x)
887 else:
888 rl.append(x.replace(search, replace))
889 ret.append(rl)
890 return ret
892 def _check_decimal(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
893 if self.decimal == parser_defaults["decimal"]:
894 return lines
896 return self._search_replace_num_columns(
897 lines=lines, search=self.decimal, replace="."
898 )
900 def _clear_buffer(self) -> None:
901 self.buf = []
903 _implicit_index = False
905 def _get_index_name(
906 self, columns: list[Hashable]
907 ) -> tuple[Sequence[Hashable] | None, list[Hashable], list[Hashable]]:
908 """
909 Try several cases to get lines:
911 0) There are headers on row 0 and row 1 and their
912 total summed lengths equals the length of the next line.
913 Treat row 0 as columns and row 1 as indices
914 1) Look for implicit index: there are more columns
915 on row 1 than row 0. If this is true, assume that row
916 1 lists index columns and row 0 lists normal columns.
917 2) Get index from the columns if it was listed.
918 """
919 orig_names = list(columns)
920 columns = list(columns)
922 line: list[Scalar] | None
923 if self._header_line is not None:
924 line = self._header_line
925 else:
926 try:
927 line = self._next_line()
928 except StopIteration:
929 line = None
931 next_line: list[Scalar] | None
932 try:
933 next_line = self._next_line()
934 except StopIteration:
935 next_line = None
937 # implicitly index_col=0 b/c 1 fewer column names
938 implicit_first_cols = 0
939 if line is not None:
940 # leave it 0, #2442
941 # Case 1
942 # error: Cannot determine type of 'index_col'
943 index_col = self.index_col # type: ignore[has-type]
944 if index_col is not False:
945 implicit_first_cols = len(line) - self.num_original_columns
947 # Case 0
948 if (
949 next_line is not None
950 and self.header is not None
951 and index_col is not False
952 ):
953 if len(next_line) == len(line) + self.num_original_columns:
954 # column and index names on diff rows
955 self.index_col = list(range(len(line)))
956 self.buf = self.buf[1:]
958 for c in reversed(line):
959 columns.insert(0, c)
961 # Update list of original names to include all indices.
962 orig_names = list(columns)
963 self.num_original_columns = len(columns)
964 return line, orig_names, columns
966 if implicit_first_cols > 0:
967 # Case 1
968 self._implicit_index = True
969 if self.index_col is None:
970 self.index_col = list(range(implicit_first_cols))
972 index_name = None
974 else:
975 # Case 2
976 (index_name, _, self.index_col) = self._clean_index_names(
977 columns, self.index_col
978 )
980 return index_name, orig_names, columns
982 def _rows_to_cols(self, content: list[list[Scalar]]) -> list[np.ndarray]:
983 col_len = self.num_original_columns
985 if self._implicit_index:
986 col_len += len(self.index_col)
988 max_len = max(len(row) for row in content)
990 # Check that there are no rows with too many
991 # elements in their row (rows with too few
992 # elements are padded with NaN).
993 # error: Non-overlapping identity check (left operand type: "List[int]",
994 # right operand type: "Literal[False]")
995 if (
996 max_len > col_len
997 and self.index_col is not False # type: ignore[comparison-overlap]
998 and self.usecols is None
999 ):
1001 footers = self.skipfooter if self.skipfooter else 0
1002 bad_lines = []
1004 iter_content = enumerate(content)
1005 content_len = len(content)
1006 content = []
1008 for (i, l) in iter_content:
1009 actual_len = len(l)
1011 if actual_len > col_len:
1012 if callable(self.on_bad_lines):
1013 new_l = self.on_bad_lines(l)
1014 if new_l is not None:
1015 content.append(new_l)
1016 elif (
1017 self.on_bad_lines == self.BadLineHandleMethod.ERROR
1018 or self.on_bad_lines == self.BadLineHandleMethod.WARN
1019 ):
1020 row_num = self.pos - (content_len - i + footers)
1021 bad_lines.append((row_num, actual_len))
1023 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
1024 break
1025 else:
1026 content.append(l)
1028 for row_num, actual_len in bad_lines:
1029 msg = (
1030 f"Expected {col_len} fields in line {row_num + 1}, saw "
1031 f"{actual_len}"
1032 )
1033 if (
1034 self.delimiter
1035 and len(self.delimiter) > 1
1036 and self.quoting != csv.QUOTE_NONE
1037 ):
1038 # see gh-13374
1039 reason = (
1040 "Error could possibly be due to quotes being "
1041 "ignored when a multi-char delimiter is used."
1042 )
1043 msg += ". " + reason
1045 self._alert_malformed(msg, row_num + 1)
1047 # see gh-13320
1048 zipped_content = list(lib.to_object_array(content, min_width=col_len).T)
1050 if self.usecols:
1051 assert self._col_indices is not None
1052 col_indices = self._col_indices
1054 if self._implicit_index:
1055 zipped_content = [
1056 a
1057 for i, a in enumerate(zipped_content)
1058 if (
1059 i < len(self.index_col)
1060 or i - len(self.index_col) in col_indices
1061 )
1062 ]
1063 else:
1064 zipped_content = [
1065 a for i, a in enumerate(zipped_content) if i in col_indices
1066 ]
1067 return zipped_content
1069 def _get_lines(self, rows: int | None = None) -> list[list[Scalar]]:
1070 lines = self.buf
1071 new_rows = None
1073 # already fetched some number
1074 if rows is not None:
1075 # we already have the lines in the buffer
1076 if len(self.buf) >= rows:
1077 new_rows, self.buf = self.buf[:rows], self.buf[rows:]
1079 # need some lines
1080 else:
1081 rows -= len(self.buf)
1083 if new_rows is None:
1084 if isinstance(self.data, list):
1085 if self.pos > len(self.data):
1086 raise StopIteration
1087 if rows is None:
1088 new_rows = self.data[self.pos :]
1089 new_pos = len(self.data)
1090 else:
1091 new_rows = self.data[self.pos : self.pos + rows]
1092 new_pos = self.pos + rows
1094 new_rows = self._remove_skipped_rows(new_rows)
1095 lines.extend(new_rows)
1096 self.pos = new_pos
1098 else:
1099 new_rows = []
1100 try:
1101 if rows is not None:
1103 rows_to_skip = 0
1104 if self.skiprows is not None and self.pos is not None:
1105 # Only read additional rows if pos is in skiprows
1106 rows_to_skip = len(
1107 set(self.skiprows) - set(range(self.pos))
1108 )
1110 for _ in range(rows + rows_to_skip):
1111 # assert for mypy, data is Iterator[str] or None, would
1112 # error in next
1113 assert self.data is not None
1114 new_rows.append(next(self.data))
1116 len_new_rows = len(new_rows)
1117 new_rows = self._remove_skipped_rows(new_rows)
1118 lines.extend(new_rows)
1119 else:
1120 rows = 0
1122 while True:
1123 new_row = self._next_iter_line(row_num=self.pos + rows + 1)
1124 rows += 1
1126 if new_row is not None:
1127 new_rows.append(new_row)
1128 len_new_rows = len(new_rows)
1130 except StopIteration:
1131 len_new_rows = len(new_rows)
1132 new_rows = self._remove_skipped_rows(new_rows)
1133 lines.extend(new_rows)
1134 if len(lines) == 0:
1135 raise
1136 self.pos += len_new_rows
1138 self.buf = []
1139 else:
1140 lines = new_rows
1142 if self.skipfooter:
1143 lines = lines[: -self.skipfooter]
1145 lines = self._check_comments(lines)
1146 if self.skip_blank_lines:
1147 lines = self._remove_empty_lines(lines)
1148 lines = self._check_thousands(lines)
1149 return self._check_decimal(lines)
1151 def _remove_skipped_rows(self, new_rows: list[list[Scalar]]) -> list[list[Scalar]]:
1152 if self.skiprows:
1153 return [
1154 row for i, row in enumerate(new_rows) if not self.skipfunc(i + self.pos)
1155 ]
1156 return new_rows
1159class FixedWidthReader(abc.Iterator):
1160 """
1161 A reader of fixed-width lines.
1162 """
1164 def __init__(
1165 self,
1166 f: IO[str] | ReadCsvBuffer[str],
1167 colspecs: list[tuple[int, int]] | Literal["infer"],
1168 delimiter: str | None,
1169 comment: str | None,
1170 skiprows: set[int] | None = None,
1171 infer_nrows: int = 100,
1172 ) -> None:
1173 self.f = f
1174 self.buffer: Iterator | None = None
1175 self.delimiter = "\r\n" + delimiter if delimiter else "\n\r\t "
1176 self.comment = comment
1177 if colspecs == "infer":
1178 self.colspecs = self.detect_colspecs(
1179 infer_nrows=infer_nrows, skiprows=skiprows
1180 )
1181 else:
1182 self.colspecs = colspecs
1184 if not isinstance(self.colspecs, (tuple, list)):
1185 raise TypeError(
1186 "column specifications must be a list or tuple, "
1187 f"input was a {type(colspecs).__name__}"
1188 )
1190 for colspec in self.colspecs:
1191 if not (
1192 isinstance(colspec, (tuple, list))
1193 and len(colspec) == 2
1194 and isinstance(colspec[0], (int, np.integer, type(None)))
1195 and isinstance(colspec[1], (int, np.integer, type(None)))
1196 ):
1197 raise TypeError(
1198 "Each column specification must be "
1199 "2 element tuple or list of integers"
1200 )
1202 def get_rows(self, infer_nrows: int, skiprows: set[int] | None = None) -> list[str]:
1203 """
1204 Read rows from self.f, skipping as specified.
1206 We distinguish buffer_rows (the first <= infer_nrows
1207 lines) from the rows returned to detect_colspecs
1208 because it's simpler to leave the other locations
1209 with skiprows logic alone than to modify them to
1210 deal with the fact we skipped some rows here as
1211 well.
1213 Parameters
1214 ----------
1215 infer_nrows : int
1216 Number of rows to read from self.f, not counting
1217 rows that are skipped.
1218 skiprows: set, optional
1219 Indices of rows to skip.
1221 Returns
1222 -------
1223 detect_rows : list of str
1224 A list containing the rows to read.
1226 """
1227 if skiprows is None:
1228 skiprows = set()
1229 buffer_rows = []
1230 detect_rows = []
1231 for i, row in enumerate(self.f):
1232 if i not in skiprows:
1233 detect_rows.append(row)
1234 buffer_rows.append(row)
1235 if len(detect_rows) >= infer_nrows:
1236 break
1237 self.buffer = iter(buffer_rows)
1238 return detect_rows
1240 def detect_colspecs(
1241 self, infer_nrows: int = 100, skiprows: set[int] | None = None
1242 ) -> list[tuple[int, int]]:
1243 # Regex escape the delimiters
1244 delimiters = "".join([rf"\{x}" for x in self.delimiter])
1245 pattern = re.compile(f"([^{delimiters}]+)")
1246 rows = self.get_rows(infer_nrows, skiprows)
1247 if not rows:
1248 raise EmptyDataError("No rows from which to infer column width")
1249 max_len = max(map(len, rows))
1250 mask = np.zeros(max_len + 1, dtype=int)
1251 if self.comment is not None:
1252 rows = [row.partition(self.comment)[0] for row in rows]
1253 for row in rows:
1254 for m in pattern.finditer(row):
1255 mask[m.start() : m.end()] = 1
1256 shifted = np.roll(mask, 1)
1257 shifted[0] = 0
1258 edges = np.where((mask ^ shifted) == 1)[0]
1259 edge_pairs = list(zip(edges[::2], edges[1::2]))
1260 return edge_pairs
1262 def __next__(self) -> list[str]:
1263 # Argument 1 to "next" has incompatible type "Union[IO[str],
1264 # ReadCsvBuffer[str]]"; expected "SupportsNext[str]"
1265 if self.buffer is not None:
1266 try:
1267 line = next(self.buffer)
1268 except StopIteration:
1269 self.buffer = None
1270 line = next(self.f) # type: ignore[arg-type]
1271 else:
1272 line = next(self.f) # type: ignore[arg-type]
1273 # Note: 'colspecs' is a sequence of half-open intervals.
1274 return [line[fromm:to].strip(self.delimiter) for (fromm, to) in self.colspecs]
1277class FixedWidthFieldParser(PythonParser):
1278 """
1279 Specialization that Converts fixed-width fields into DataFrames.
1280 See PythonParser for details.
1281 """
1283 def __init__(self, f: ReadCsvBuffer[str], **kwds) -> None:
1284 # Support iterators, convert to a list.
1285 self.colspecs = kwds.pop("colspecs")
1286 self.infer_nrows = kwds.pop("infer_nrows")
1287 PythonParser.__init__(self, f, **kwds)
1289 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> None:
1290 self.data = FixedWidthReader(
1291 f,
1292 self.colspecs,
1293 self.delimiter,
1294 self.comment,
1295 self.skiprows,
1296 self.infer_nrows,
1297 )
1299 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]:
1300 """
1301 Returns the list of lines without the empty ones. With fixed-width
1302 fields, empty lines become arrays of empty strings.
1304 See PythonParser._remove_empty_lines.
1305 """
1306 return [
1307 line
1308 for line in lines
1309 if any(not isinstance(e, str) or e.strip() for e in line)
1310 ]
1313def count_empty_vals(vals) -> int:
1314 return sum(1 for v in vals if v == "" or v is None)
1317def _validate_skipfooter_arg(skipfooter: int) -> int:
1318 """
1319 Validate the 'skipfooter' parameter.
1321 Checks whether 'skipfooter' is a non-negative integer.
1322 Raises a ValueError if that is not the case.
1324 Parameters
1325 ----------
1326 skipfooter : non-negative integer
1327 The number of rows to skip at the end of the file.
1329 Returns
1330 -------
1331 validated_skipfooter : non-negative integer
1332 The original input if the validation succeeds.
1334 Raises
1335 ------
1336 ValueError : 'skipfooter' was not a non-negative integer.
1337 """
1338 if not is_integer(skipfooter):
1339 raise ValueError("skipfooter must be an integer")
1341 if skipfooter < 0:
1342 raise ValueError("skipfooter cannot be negative")
1344 return skipfooter