Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py: 6%

639 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3from collections import ( 

4 abc, 

5 defaultdict, 

6) 

7import csv 

8from io import StringIO 

9import re 

10import sys 

11from typing import ( 

12 IO, 

13 TYPE_CHECKING, 

14 DefaultDict, 

15 Hashable, 

16 Iterator, 

17 List, 

18 Literal, 

19 Mapping, 

20 Sequence, 

21 cast, 

22) 

23import warnings 

24 

25import numpy as np 

26 

27import pandas._libs.lib as lib 

28from pandas._typing import ( 

29 ArrayLike, 

30 ReadCsvBuffer, 

31 Scalar, 

32) 

33from pandas.errors import ( 

34 EmptyDataError, 

35 ParserError, 

36) 

37from pandas.util._exceptions import find_stack_level 

38 

39from pandas.core.dtypes.common import is_integer 

40from pandas.core.dtypes.inference import is_dict_like 

41 

42from pandas.io.parsers.base_parser import ( 

43 ParserBase, 

44 parser_defaults, 

45) 

46 

47if TYPE_CHECKING: 47 ↛ 48line 47 didn't jump to line 48, because the condition on line 47 was never true

48 from pandas import ( 

49 Index, 

50 MultiIndex, 

51 ) 

52 

53# BOM character (byte order mark) 

54# This exists at the beginning of a file to indicate endianness 

55# of a file (stream). Unfortunately, this marker screws up parsing, 

56# so we need to remove it if we see it. 

57_BOM = "\ufeff" 

58 

59 

60class PythonParser(ParserBase): 

61 def __init__(self, f: ReadCsvBuffer[str] | list, **kwds) -> None: 

62 """ 

63 Workhorse function for processing nested list into DataFrame 

64 """ 

65 super().__init__(kwds) 

66 

67 self.data: Iterator[str] | None = None 

68 self.buf: list = [] 

69 self.pos = 0 

70 self.line_pos = 0 

71 

72 self.skiprows = kwds["skiprows"] 

73 

74 if callable(self.skiprows): 

75 self.skipfunc = self.skiprows 

76 else: 

77 self.skipfunc = lambda x: x in self.skiprows 

78 

79 self.skipfooter = _validate_skipfooter_arg(kwds["skipfooter"]) 

80 self.delimiter = kwds["delimiter"] 

81 

82 self.quotechar = kwds["quotechar"] 

83 if isinstance(self.quotechar, str): 

84 self.quotechar = str(self.quotechar) 

85 

86 self.escapechar = kwds["escapechar"] 

87 self.doublequote = kwds["doublequote"] 

88 self.skipinitialspace = kwds["skipinitialspace"] 

89 self.lineterminator = kwds["lineterminator"] 

90 self.quoting = kwds["quoting"] 

91 self.skip_blank_lines = kwds["skip_blank_lines"] 

92 

93 self.names_passed = kwds["names"] or None 

94 

95 self.has_index_names = False 

96 if "has_index_names" in kwds: 

97 self.has_index_names = kwds["has_index_names"] 

98 

99 self.verbose = kwds["verbose"] 

100 

101 self.thousands = kwds["thousands"] 

102 self.decimal = kwds["decimal"] 

103 

104 self.comment = kwds["comment"] 

105 

106 # Set self.data to something that can read lines. 

107 if isinstance(f, list): 

108 # read_excel: f is a list 

109 self.data = cast(Iterator[str], f) 

110 else: 

111 assert hasattr(f, "readline") 

112 self._make_reader(f) 

113 

114 # Get columns in two steps: infer from data, then 

115 # infer column indices from self.usecols if it is specified. 

116 self._col_indices: list[int] | None = None 

117 columns: list[list[Scalar | None]] 

118 ( 

119 columns, 

120 self.num_original_columns, 

121 self.unnamed_cols, 

122 ) = self._infer_columns() 

123 

124 # Now self.columns has the set of columns that we will process. 

125 # The original set is stored in self.original_columns. 

126 # error: Cannot determine type of 'index_names' 

127 self.columns: list[Hashable] 

128 ( 

129 self.columns, 

130 self.index_names, 

131 self.col_names, 

132 _, 

133 ) = self._extract_multi_indexer_columns( 

134 columns, 

135 self.index_names, # type: ignore[has-type] 

136 ) 

137 

138 # get popped off for index 

139 self.orig_names: list[Hashable] = list(self.columns) 

140 

141 # needs to be cleaned/refactored 

142 # multiple date column thing turning into a real spaghetti factory 

143 

144 if not self._has_complex_date_col: 

145 (index_names, self.orig_names, self.columns) = self._get_index_name( 

146 self.columns 

147 ) 

148 self._name_processed = True 

149 if self.index_names is None: 

150 self.index_names = index_names 

151 

152 if self._col_indices is None: 

153 self._col_indices = list(range(len(self.columns))) 

154 

155 self._parse_date_cols = self._validate_parse_dates_presence(self.columns) 

156 no_thousands_columns: set[int] | None = None 

157 if self.parse_dates: 

158 no_thousands_columns = self._set_noconvert_dtype_columns( 

159 self._col_indices, self.columns 

160 ) 

161 self._no_thousands_columns = no_thousands_columns 

162 

163 if len(self.decimal) != 1: 

164 raise ValueError("Only length-1 decimal markers supported") 

165 

166 decimal = re.escape(self.decimal) 

167 if self.thousands is None: 

168 regex = rf"^[\-\+]?[0-9]*({decimal}[0-9]*)?([0-9]?(E|e)\-?[0-9]+)?$" 

169 else: 

170 thousands = re.escape(self.thousands) 

171 regex = ( 

172 rf"^[\-\+]?([0-9]+{thousands}|[0-9])*({decimal}[0-9]*)?" 

173 rf"([0-9]?(E|e)\-?[0-9]+)?$" 

174 ) 

175 self.num = re.compile(regex) 

176 

177 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> None: 

178 sep = self.delimiter 

179 

180 if sep is None or len(sep) == 1: 

181 if self.lineterminator: 

182 raise ValueError( 

183 "Custom line terminators not supported in python parser (yet)" 

184 ) 

185 

186 class MyDialect(csv.Dialect): 

187 delimiter = self.delimiter 

188 quotechar = self.quotechar 

189 escapechar = self.escapechar 

190 doublequote = self.doublequote 

191 skipinitialspace = self.skipinitialspace 

192 quoting = self.quoting 

193 lineterminator = "\n" 

194 

195 dia = MyDialect 

196 

197 if sep is not None: 

198 dia.delimiter = sep 

199 else: 

200 # attempt to sniff the delimiter from the first valid line, 

201 # i.e. no comment line and not in skiprows 

202 line = f.readline() 

203 lines = self._check_comments([[line]])[0] 

204 while self.skipfunc(self.pos) or not lines: 

205 self.pos += 1 

206 line = f.readline() 

207 lines = self._check_comments([[line]])[0] 

208 lines_str = cast(List[str], lines) 

209 

210 # since `line` was a string, lines will be a list containing 

211 # only a single string 

212 line = lines_str[0] 

213 

214 self.pos += 1 

215 self.line_pos += 1 

216 sniffed = csv.Sniffer().sniff(line) 

217 dia.delimiter = sniffed.delimiter 

218 

219 # Note: encoding is irrelevant here 

220 line_rdr = csv.reader(StringIO(line), dialect=dia) 

221 self.buf.extend(list(line_rdr)) 

222 

223 # Note: encoding is irrelevant here 

224 reader = csv.reader(f, dialect=dia, strict=True) 

225 

226 else: 

227 

228 def _read(): 

229 line = f.readline() 

230 pat = re.compile(sep) 

231 

232 yield pat.split(line.strip()) 

233 

234 for line in f: 

235 yield pat.split(line.strip()) 

236 

237 reader = _read() 

238 

239 # error: Incompatible types in assignment (expression has type "_reader", 

240 # variable has type "Union[IO[Any], RawIOBase, BufferedIOBase, TextIOBase, 

241 # TextIOWrapper, mmap, None]") 

242 self.data = reader # type: ignore[assignment] 

243 

244 def read( 

245 self, rows: int | None = None 

246 ) -> tuple[ 

247 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike] 

248 ]: 

249 try: 

250 content = self._get_lines(rows) 

251 except StopIteration: 

252 if self._first_chunk: 

253 content = [] 

254 else: 

255 self.close() 

256 raise 

257 

258 # done with first read, next time raise StopIteration 

259 self._first_chunk = False 

260 

261 columns: Sequence[Hashable] = list(self.orig_names) 

262 if not len(content): # pragma: no cover 

263 # DataFrame with the right metadata, even though it's length 0 

264 names = self._maybe_dedup_names(self.orig_names) 

265 # error: Cannot determine type of 'index_col' 

266 index, columns, col_dict = self._get_empty_meta( 

267 names, 

268 self.index_col, # type: ignore[has-type] 

269 self.index_names, 

270 self.dtype, 

271 ) 

272 conv_columns = self._maybe_make_multi_index_columns(columns, self.col_names) 

273 return index, conv_columns, col_dict 

274 

275 # handle new style for names in index 

276 count_empty_content_vals = count_empty_vals(content[0]) 

277 indexnamerow = None 

278 if self.has_index_names and count_empty_content_vals == len(columns): 

279 indexnamerow = content[0] 

280 content = content[1:] 

281 

282 alldata = self._rows_to_cols(content) 

283 data, columns = self._exclude_implicit_index(alldata) 

284 

285 conv_data = self._convert_data(data) 

286 columns, conv_data = self._do_date_conversions(columns, conv_data) 

287 

288 index, result_columns = self._make_index( 

289 conv_data, alldata, columns, indexnamerow 

290 ) 

291 

292 return index, result_columns, conv_data 

293 

294 def _exclude_implicit_index( 

295 self, 

296 alldata: list[np.ndarray], 

297 ) -> tuple[Mapping[Hashable, np.ndarray], Sequence[Hashable]]: 

298 names = self._maybe_dedup_names(self.orig_names) 

299 

300 offset = 0 

301 if self._implicit_index: 

302 # error: Cannot determine type of 'index_col' 

303 offset = len(self.index_col) # type: ignore[has-type] 

304 

305 len_alldata = len(alldata) 

306 self._check_data_length(names, alldata) 

307 

308 return { 

309 name: alldata[i + offset] for i, name in enumerate(names) if i < len_alldata 

310 }, names 

311 

312 # legacy 

313 def get_chunk( 

314 self, size: int | None = None 

315 ) -> tuple[ 

316 Index | None, Sequence[Hashable] | MultiIndex, Mapping[Hashable, ArrayLike] 

317 ]: 

318 if size is None: 

319 # error: "PythonParser" has no attribute "chunksize" 

320 size = self.chunksize # type: ignore[attr-defined] 

321 return self.read(rows=size) 

322 

323 def _convert_data( 

324 self, 

325 data: Mapping[Hashable, np.ndarray], 

326 ) -> Mapping[Hashable, ArrayLike]: 

327 # apply converters 

328 clean_conv = self._clean_mapping(self.converters) 

329 clean_dtypes = self._clean_mapping(self.dtype) 

330 

331 # Apply NA values. 

332 clean_na_values = {} 

333 clean_na_fvalues = {} 

334 

335 if isinstance(self.na_values, dict): 

336 for col in self.na_values: 

337 na_value = self.na_values[col] 

338 na_fvalue = self.na_fvalues[col] 

339 

340 if isinstance(col, int) and col not in self.orig_names: 

341 col = self.orig_names[col] 

342 

343 clean_na_values[col] = na_value 

344 clean_na_fvalues[col] = na_fvalue 

345 else: 

346 clean_na_values = self.na_values 

347 clean_na_fvalues = self.na_fvalues 

348 

349 return self._convert_to_ndarrays( 

350 data, 

351 clean_na_values, 

352 clean_na_fvalues, 

353 self.verbose, 

354 clean_conv, 

355 clean_dtypes, 

356 ) 

357 

358 def _infer_columns( 

359 self, 

360 ) -> tuple[list[list[Scalar | None]], int, set[Scalar | None]]: 

361 names = self.names 

362 num_original_columns = 0 

363 clear_buffer = True 

364 unnamed_cols: set[Scalar | None] = set() 

365 self._header_line = None 

366 

367 if self.header is not None: 

368 header = self.header 

369 

370 if isinstance(header, (list, tuple, np.ndarray)): 

371 have_mi_columns = len(header) > 1 

372 # we have a mi columns, so read an extra line 

373 if have_mi_columns: 

374 header = list(header) + [header[-1] + 1] 

375 else: 

376 have_mi_columns = False 

377 header = [header] 

378 

379 columns: list[list[Scalar | None]] = [] 

380 for level, hr in enumerate(header): 

381 try: 

382 line = self._buffered_line() 

383 

384 while self.line_pos <= hr: 

385 line = self._next_line() 

386 

387 except StopIteration as err: 

388 if 0 < self.line_pos <= hr and ( 

389 not have_mi_columns or hr != header[-1] 

390 ): 

391 # If no rows we want to raise a different message and if 

392 # we have mi columns, the last line is not part of the header 

393 joi = list(map(str, header[:-1] if have_mi_columns else header)) 

394 msg = f"[{','.join(joi)}], len of {len(joi)}, " 

395 raise ValueError( 

396 f"Passed header={msg}" 

397 f"but only {self.line_pos} lines in file" 

398 ) from err 

399 

400 # We have an empty file, so check 

401 # if columns are provided. That will 

402 # serve as the 'line' for parsing 

403 if have_mi_columns and hr > 0: 

404 if clear_buffer: 

405 self._clear_buffer() 

406 columns.append([None] * len(columns[-1])) 

407 return columns, num_original_columns, unnamed_cols 

408 

409 if not self.names: 

410 raise EmptyDataError("No columns to parse from file") from err 

411 

412 line = self.names[:] 

413 

414 this_columns: list[Scalar | None] = [] 

415 this_unnamed_cols = [] 

416 

417 for i, c in enumerate(line): 

418 if c == "": 

419 if have_mi_columns: 

420 col_name = f"Unnamed: {i}_level_{level}" 

421 else: 

422 col_name = f"Unnamed: {i}" 

423 

424 this_unnamed_cols.append(i) 

425 this_columns.append(col_name) 

426 else: 

427 this_columns.append(c) 

428 

429 if not have_mi_columns and self.mangle_dupe_cols: 

430 counts: DefaultDict = defaultdict(int) 

431 # Ensure that regular columns are used before unnamed ones 

432 # to keep given names and mangle unnamed columns 

433 col_loop_order = [ 

434 i 

435 for i in range(len(this_columns)) 

436 if i not in this_unnamed_cols 

437 ] + this_unnamed_cols 

438 

439 for i in col_loop_order: 

440 col = this_columns[i] 

441 old_col = col 

442 cur_count = counts[col] 

443 

444 if cur_count > 0: 

445 while cur_count > 0: 

446 counts[old_col] = cur_count + 1 

447 col = f"{old_col}.{cur_count}" 

448 if col in this_columns: 

449 cur_count += 1 

450 else: 

451 cur_count = counts[col] 

452 

453 if ( 

454 self.dtype is not None 

455 and is_dict_like(self.dtype) 

456 and self.dtype.get(old_col) is not None 

457 and self.dtype.get(col) is None 

458 ): 

459 self.dtype.update({col: self.dtype.get(old_col)}) 

460 this_columns[i] = col 

461 counts[col] = cur_count + 1 

462 elif have_mi_columns: 

463 

464 # if we have grabbed an extra line, but its not in our 

465 # format so save in the buffer, and create an blank extra 

466 # line for the rest of the parsing code 

467 if hr == header[-1]: 

468 lc = len(this_columns) 

469 # error: Cannot determine type of 'index_col' 

470 sic = self.index_col # type: ignore[has-type] 

471 ic = len(sic) if sic is not None else 0 

472 unnamed_count = len(this_unnamed_cols) 

473 

474 # if wrong number of blanks or no index, not our format 

475 if (lc != unnamed_count and lc - ic > unnamed_count) or ic == 0: 

476 clear_buffer = False 

477 this_columns = [None] * lc 

478 self.buf = [self.buf[-1]] 

479 

480 columns.append(this_columns) 

481 unnamed_cols.update({this_columns[i] for i in this_unnamed_cols}) 

482 

483 if len(columns) == 1: 

484 num_original_columns = len(this_columns) 

485 

486 if clear_buffer: 

487 self._clear_buffer() 

488 

489 first_line: list[Scalar] | None 

490 if names is not None: 

491 # Read first row after header to check if data are longer 

492 try: 

493 first_line = self._next_line() 

494 except StopIteration: 

495 first_line = None 

496 

497 len_first_data_row = 0 if first_line is None else len(first_line) 

498 

499 if len(names) > len(columns[0]) and len(names) > len_first_data_row: 

500 raise ValueError( 

501 "Number of passed names did not match " 

502 "number of header fields in the file" 

503 ) 

504 if len(columns) > 1: 

505 raise TypeError("Cannot pass names with multi-index columns") 

506 

507 if self.usecols is not None: 

508 # Set _use_cols. We don't store columns because they are 

509 # overwritten. 

510 self._handle_usecols(columns, names, num_original_columns) 

511 else: 

512 num_original_columns = len(names) 

513 if self._col_indices is not None and len(names) != len( 

514 self._col_indices 

515 ): 

516 columns = [[names[i] for i in sorted(self._col_indices)]] 

517 else: 

518 columns = [names] 

519 else: 

520 columns = self._handle_usecols( 

521 columns, columns[0], num_original_columns 

522 ) 

523 else: 

524 try: 

525 line = self._buffered_line() 

526 

527 except StopIteration as err: 

528 if not names: 

529 raise EmptyDataError("No columns to parse from file") from err 

530 

531 line = names[:] 

532 

533 # Store line, otherwise it is lost for guessing the index 

534 self._header_line = line 

535 ncols = len(line) 

536 num_original_columns = ncols 

537 

538 if not names: 

539 if self.prefix: 

540 columns = [[f"{self.prefix}{i}" for i in range(ncols)]] 

541 else: 

542 columns = [list(range(ncols))] 

543 columns = self._handle_usecols( 

544 columns, columns[0], num_original_columns 

545 ) 

546 else: 

547 if self.usecols is None or len(names) >= num_original_columns: 

548 columns = self._handle_usecols([names], names, num_original_columns) 

549 num_original_columns = len(names) 

550 else: 

551 if not callable(self.usecols) and len(names) != len(self.usecols): 

552 raise ValueError( 

553 "Number of passed names did not match number of " 

554 "header fields in the file" 

555 ) 

556 # Ignore output but set used columns. 

557 self._handle_usecols([names], names, ncols) 

558 columns = [names] 

559 num_original_columns = ncols 

560 

561 return columns, num_original_columns, unnamed_cols 

562 

563 def _handle_usecols( 

564 self, 

565 columns: list[list[Scalar | None]], 

566 usecols_key: list[Scalar | None], 

567 num_original_columns: int, 

568 ) -> list[list[Scalar | None]]: 

569 """ 

570 Sets self._col_indices 

571 

572 usecols_key is used if there are string usecols. 

573 """ 

574 col_indices: set[int] | list[int] 

575 if self.usecols is not None: 

576 if callable(self.usecols): 

577 col_indices = self._evaluate_usecols(self.usecols, usecols_key) 

578 elif any(isinstance(u, str) for u in self.usecols): 

579 if len(columns) > 1: 

580 raise ValueError( 

581 "If using multiple headers, usecols must be integers." 

582 ) 

583 col_indices = [] 

584 

585 for col in self.usecols: 

586 if isinstance(col, str): 

587 try: 

588 col_indices.append(usecols_key.index(col)) 

589 except ValueError: 

590 self._validate_usecols_names(self.usecols, usecols_key) 

591 else: 

592 col_indices.append(col) 

593 else: 

594 missing_usecols = [ 

595 col for col in self.usecols if col >= num_original_columns 

596 ] 

597 if missing_usecols: 

598 warnings.warn( 

599 "Defining usecols with out of bounds indices is deprecated " 

600 "and will raise a ParserError in a future version.", 

601 FutureWarning, 

602 stacklevel=find_stack_level(), 

603 ) 

604 col_indices = self.usecols 

605 

606 columns = [ 

607 [n for i, n in enumerate(column) if i in col_indices] 

608 for column in columns 

609 ] 

610 self._col_indices = sorted(col_indices) 

611 return columns 

612 

613 def _buffered_line(self) -> list[Scalar]: 

614 """ 

615 Return a line from buffer, filling buffer if required. 

616 """ 

617 if len(self.buf) > 0: 

618 return self.buf[0] 

619 else: 

620 return self._next_line() 

621 

622 def _check_for_bom(self, first_row: list[Scalar]) -> list[Scalar]: 

623 """ 

624 Checks whether the file begins with the BOM character. 

625 If it does, remove it. In addition, if there is quoting 

626 in the field subsequent to the BOM, remove it as well 

627 because it technically takes place at the beginning of 

628 the name, not the middle of it. 

629 """ 

630 # first_row will be a list, so we need to check 

631 # that that list is not empty before proceeding. 

632 if not first_row: 

633 return first_row 

634 

635 # The first element of this row is the one that could have the 

636 # BOM that we want to remove. Check that the first element is a 

637 # string before proceeding. 

638 if not isinstance(first_row[0], str): 

639 return first_row 

640 

641 # Check that the string is not empty, as that would 

642 # obviously not have a BOM at the start of it. 

643 if not first_row[0]: 

644 return first_row 

645 

646 # Since the string is non-empty, check that it does 

647 # in fact begin with a BOM. 

648 first_elt = first_row[0][0] 

649 if first_elt != _BOM: 

650 return first_row 

651 

652 first_row_bom = first_row[0] 

653 new_row: str 

654 

655 if len(first_row_bom) > 1 and first_row_bom[1] == self.quotechar: 

656 start = 2 

657 quote = first_row_bom[1] 

658 end = first_row_bom[2:].index(quote) + 2 

659 

660 # Extract the data between the quotation marks 

661 new_row = first_row_bom[start:end] 

662 

663 # Extract any remaining data after the second 

664 # quotation mark. 

665 if len(first_row_bom) > end + 1: 

666 new_row += first_row_bom[end + 1 :] 

667 

668 else: 

669 

670 # No quotation so just remove BOM from first element 

671 new_row = first_row_bom[1:] 

672 

673 new_row_list: list[Scalar] = [new_row] 

674 return new_row_list + first_row[1:] 

675 

676 def _is_line_empty(self, line: list[Scalar]) -> bool: 

677 """ 

678 Check if a line is empty or not. 

679 

680 Parameters 

681 ---------- 

682 line : str, array-like 

683 The line of data to check. 

684 

685 Returns 

686 ------- 

687 boolean : Whether or not the line is empty. 

688 """ 

689 return not line or all(not x for x in line) 

690 

691 def _next_line(self) -> list[Scalar]: 

692 if isinstance(self.data, list): 

693 while self.skipfunc(self.pos): 

694 if self.pos >= len(self.data): 

695 break 

696 self.pos += 1 

697 

698 while True: 

699 try: 

700 line = self._check_comments([self.data[self.pos]])[0] 

701 self.pos += 1 

702 # either uncommented or blank to begin with 

703 if not self.skip_blank_lines and ( 

704 self._is_line_empty(self.data[self.pos - 1]) or line 

705 ): 

706 break 

707 elif self.skip_blank_lines: 

708 ret = self._remove_empty_lines([line]) 

709 if ret: 

710 line = ret[0] 

711 break 

712 except IndexError: 

713 raise StopIteration 

714 else: 

715 while self.skipfunc(self.pos): 

716 self.pos += 1 

717 # assert for mypy, data is Iterator[str] or None, would error in next 

718 assert self.data is not None 

719 next(self.data) 

720 

721 while True: 

722 orig_line = self._next_iter_line(row_num=self.pos + 1) 

723 self.pos += 1 

724 

725 if orig_line is not None: 

726 line = self._check_comments([orig_line])[0] 

727 

728 if self.skip_blank_lines: 

729 ret = self._remove_empty_lines([line]) 

730 

731 if ret: 

732 line = ret[0] 

733 break 

734 elif self._is_line_empty(orig_line) or line: 

735 break 

736 

737 # This was the first line of the file, 

738 # which could contain the BOM at the 

739 # beginning of it. 

740 if self.pos == 1: 

741 line = self._check_for_bom(line) 

742 

743 self.line_pos += 1 

744 self.buf.append(line) 

745 return line 

746 

747 def _alert_malformed(self, msg: str, row_num: int) -> None: 

748 """ 

749 Alert a user about a malformed row, depending on value of 

750 `self.on_bad_lines` enum. 

751 

752 If `self.on_bad_lines` is ERROR, the alert will be `ParserError`. 

753 If `self.on_bad_lines` is WARN, the alert will be printed out. 

754 

755 Parameters 

756 ---------- 

757 msg: str 

758 The error message to display. 

759 row_num: int 

760 The row number where the parsing error occurred. 

761 Because this row number is displayed, we 1-index, 

762 even though we 0-index internally. 

763 """ 

764 if self.on_bad_lines == self.BadLineHandleMethod.ERROR: 

765 raise ParserError(msg) 

766 elif self.on_bad_lines == self.BadLineHandleMethod.WARN: 

767 base = f"Skipping line {row_num}: " 

768 sys.stderr.write(base + msg + "\n") 

769 

770 def _next_iter_line(self, row_num: int) -> list[Scalar] | None: 

771 """ 

772 Wrapper around iterating through `self.data` (CSV source). 

773 

774 When a CSV error is raised, we check for specific 

775 error messages that allow us to customize the 

776 error message displayed to the user. 

777 

778 Parameters 

779 ---------- 

780 row_num: int 

781 The row number of the line being parsed. 

782 """ 

783 try: 

784 # assert for mypy, data is Iterator[str] or None, would error in next 

785 assert self.data is not None 

786 line = next(self.data) 

787 # for mypy 

788 assert isinstance(line, list) 

789 return line 

790 except csv.Error as e: 

791 if ( 

792 self.on_bad_lines == self.BadLineHandleMethod.ERROR 

793 or self.on_bad_lines == self.BadLineHandleMethod.WARN 

794 ): 

795 msg = str(e) 

796 

797 if "NULL byte" in msg or "line contains NUL" in msg: 

798 msg = ( 

799 "NULL byte detected. This byte " 

800 "cannot be processed in Python's " 

801 "native csv library at the moment, " 

802 "so please pass in engine='c' instead" 

803 ) 

804 

805 if self.skipfooter > 0: 

806 reason = ( 

807 "Error could possibly be due to " 

808 "parsing errors in the skipped footer rows " 

809 "(the skipfooter keyword is only applied " 

810 "after Python's csv library has parsed " 

811 "all rows)." 

812 ) 

813 msg += ". " + reason 

814 

815 self._alert_malformed(msg, row_num) 

816 return None 

817 

818 def _check_comments(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

819 if self.comment is None: 

820 return lines 

821 ret = [] 

822 for line in lines: 

823 rl = [] 

824 for x in line: 

825 if ( 

826 not isinstance(x, str) 

827 or self.comment not in x 

828 or x in self.na_values 

829 ): 

830 rl.append(x) 

831 else: 

832 x = x[: x.find(self.comment)] 

833 if len(x) > 0: 

834 rl.append(x) 

835 break 

836 ret.append(rl) 

837 return ret 

838 

839 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

840 """ 

841 Iterate through the lines and remove any that are 

842 either empty or contain only one whitespace value 

843 

844 Parameters 

845 ---------- 

846 lines : list of list of Scalars 

847 The array of lines that we are to filter. 

848 

849 Returns 

850 ------- 

851 filtered_lines : list of list of Scalars 

852 The same array of lines with the "empty" ones removed. 

853 """ 

854 ret = [] 

855 for line in lines: 

856 # Remove empty lines and lines with only one whitespace value 

857 if ( 

858 len(line) > 1 

859 or len(line) == 1 

860 and (not isinstance(line[0], str) or line[0].strip()) 

861 ): 

862 ret.append(line) 

863 return ret 

864 

865 def _check_thousands(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

866 if self.thousands is None: 

867 return lines 

868 

869 return self._search_replace_num_columns( 

870 lines=lines, search=self.thousands, replace="" 

871 ) 

872 

873 def _search_replace_num_columns( 

874 self, lines: list[list[Scalar]], search: str, replace: str 

875 ) -> list[list[Scalar]]: 

876 ret = [] 

877 for line in lines: 

878 rl = [] 

879 for i, x in enumerate(line): 

880 if ( 

881 not isinstance(x, str) 

882 or search not in x 

883 or (self._no_thousands_columns and i in self._no_thousands_columns) 

884 or not self.num.search(x.strip()) 

885 ): 

886 rl.append(x) 

887 else: 

888 rl.append(x.replace(search, replace)) 

889 ret.append(rl) 

890 return ret 

891 

892 def _check_decimal(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

893 if self.decimal == parser_defaults["decimal"]: 

894 return lines 

895 

896 return self._search_replace_num_columns( 

897 lines=lines, search=self.decimal, replace="." 

898 ) 

899 

900 def _clear_buffer(self) -> None: 

901 self.buf = [] 

902 

903 _implicit_index = False 

904 

905 def _get_index_name( 

906 self, columns: list[Hashable] 

907 ) -> tuple[Sequence[Hashable] | None, list[Hashable], list[Hashable]]: 

908 """ 

909 Try several cases to get lines: 

910 

911 0) There are headers on row 0 and row 1 and their 

912 total summed lengths equals the length of the next line. 

913 Treat row 0 as columns and row 1 as indices 

914 1) Look for implicit index: there are more columns 

915 on row 1 than row 0. If this is true, assume that row 

916 1 lists index columns and row 0 lists normal columns. 

917 2) Get index from the columns if it was listed. 

918 """ 

919 orig_names = list(columns) 

920 columns = list(columns) 

921 

922 line: list[Scalar] | None 

923 if self._header_line is not None: 

924 line = self._header_line 

925 else: 

926 try: 

927 line = self._next_line() 

928 except StopIteration: 

929 line = None 

930 

931 next_line: list[Scalar] | None 

932 try: 

933 next_line = self._next_line() 

934 except StopIteration: 

935 next_line = None 

936 

937 # implicitly index_col=0 b/c 1 fewer column names 

938 implicit_first_cols = 0 

939 if line is not None: 

940 # leave it 0, #2442 

941 # Case 1 

942 # error: Cannot determine type of 'index_col' 

943 index_col = self.index_col # type: ignore[has-type] 

944 if index_col is not False: 

945 implicit_first_cols = len(line) - self.num_original_columns 

946 

947 # Case 0 

948 if ( 

949 next_line is not None 

950 and self.header is not None 

951 and index_col is not False 

952 ): 

953 if len(next_line) == len(line) + self.num_original_columns: 

954 # column and index names on diff rows 

955 self.index_col = list(range(len(line))) 

956 self.buf = self.buf[1:] 

957 

958 for c in reversed(line): 

959 columns.insert(0, c) 

960 

961 # Update list of original names to include all indices. 

962 orig_names = list(columns) 

963 self.num_original_columns = len(columns) 

964 return line, orig_names, columns 

965 

966 if implicit_first_cols > 0: 

967 # Case 1 

968 self._implicit_index = True 

969 if self.index_col is None: 

970 self.index_col = list(range(implicit_first_cols)) 

971 

972 index_name = None 

973 

974 else: 

975 # Case 2 

976 (index_name, _, self.index_col) = self._clean_index_names( 

977 columns, self.index_col 

978 ) 

979 

980 return index_name, orig_names, columns 

981 

982 def _rows_to_cols(self, content: list[list[Scalar]]) -> list[np.ndarray]: 

983 col_len = self.num_original_columns 

984 

985 if self._implicit_index: 

986 col_len += len(self.index_col) 

987 

988 max_len = max(len(row) for row in content) 

989 

990 # Check that there are no rows with too many 

991 # elements in their row (rows with too few 

992 # elements are padded with NaN). 

993 # error: Non-overlapping identity check (left operand type: "List[int]", 

994 # right operand type: "Literal[False]") 

995 if ( 

996 max_len > col_len 

997 and self.index_col is not False # type: ignore[comparison-overlap] 

998 and self.usecols is None 

999 ): 

1000 

1001 footers = self.skipfooter if self.skipfooter else 0 

1002 bad_lines = [] 

1003 

1004 iter_content = enumerate(content) 

1005 content_len = len(content) 

1006 content = [] 

1007 

1008 for (i, l) in iter_content: 

1009 actual_len = len(l) 

1010 

1011 if actual_len > col_len: 

1012 if callable(self.on_bad_lines): 

1013 new_l = self.on_bad_lines(l) 

1014 if new_l is not None: 

1015 content.append(new_l) 

1016 elif ( 

1017 self.on_bad_lines == self.BadLineHandleMethod.ERROR 

1018 or self.on_bad_lines == self.BadLineHandleMethod.WARN 

1019 ): 

1020 row_num = self.pos - (content_len - i + footers) 

1021 bad_lines.append((row_num, actual_len)) 

1022 

1023 if self.on_bad_lines == self.BadLineHandleMethod.ERROR: 

1024 break 

1025 else: 

1026 content.append(l) 

1027 

1028 for row_num, actual_len in bad_lines: 

1029 msg = ( 

1030 f"Expected {col_len} fields in line {row_num + 1}, saw " 

1031 f"{actual_len}" 

1032 ) 

1033 if ( 

1034 self.delimiter 

1035 and len(self.delimiter) > 1 

1036 and self.quoting != csv.QUOTE_NONE 

1037 ): 

1038 # see gh-13374 

1039 reason = ( 

1040 "Error could possibly be due to quotes being " 

1041 "ignored when a multi-char delimiter is used." 

1042 ) 

1043 msg += ". " + reason 

1044 

1045 self._alert_malformed(msg, row_num + 1) 

1046 

1047 # see gh-13320 

1048 zipped_content = list(lib.to_object_array(content, min_width=col_len).T) 

1049 

1050 if self.usecols: 

1051 assert self._col_indices is not None 

1052 col_indices = self._col_indices 

1053 

1054 if self._implicit_index: 

1055 zipped_content = [ 

1056 a 

1057 for i, a in enumerate(zipped_content) 

1058 if ( 

1059 i < len(self.index_col) 

1060 or i - len(self.index_col) in col_indices 

1061 ) 

1062 ] 

1063 else: 

1064 zipped_content = [ 

1065 a for i, a in enumerate(zipped_content) if i in col_indices 

1066 ] 

1067 return zipped_content 

1068 

1069 def _get_lines(self, rows: int | None = None) -> list[list[Scalar]]: 

1070 lines = self.buf 

1071 new_rows = None 

1072 

1073 # already fetched some number 

1074 if rows is not None: 

1075 # we already have the lines in the buffer 

1076 if len(self.buf) >= rows: 

1077 new_rows, self.buf = self.buf[:rows], self.buf[rows:] 

1078 

1079 # need some lines 

1080 else: 

1081 rows -= len(self.buf) 

1082 

1083 if new_rows is None: 

1084 if isinstance(self.data, list): 

1085 if self.pos > len(self.data): 

1086 raise StopIteration 

1087 if rows is None: 

1088 new_rows = self.data[self.pos :] 

1089 new_pos = len(self.data) 

1090 else: 

1091 new_rows = self.data[self.pos : self.pos + rows] 

1092 new_pos = self.pos + rows 

1093 

1094 new_rows = self._remove_skipped_rows(new_rows) 

1095 lines.extend(new_rows) 

1096 self.pos = new_pos 

1097 

1098 else: 

1099 new_rows = [] 

1100 try: 

1101 if rows is not None: 

1102 

1103 rows_to_skip = 0 

1104 if self.skiprows is not None and self.pos is not None: 

1105 # Only read additional rows if pos is in skiprows 

1106 rows_to_skip = len( 

1107 set(self.skiprows) - set(range(self.pos)) 

1108 ) 

1109 

1110 for _ in range(rows + rows_to_skip): 

1111 # assert for mypy, data is Iterator[str] or None, would 

1112 # error in next 

1113 assert self.data is not None 

1114 new_rows.append(next(self.data)) 

1115 

1116 len_new_rows = len(new_rows) 

1117 new_rows = self._remove_skipped_rows(new_rows) 

1118 lines.extend(new_rows) 

1119 else: 

1120 rows = 0 

1121 

1122 while True: 

1123 new_row = self._next_iter_line(row_num=self.pos + rows + 1) 

1124 rows += 1 

1125 

1126 if new_row is not None: 

1127 new_rows.append(new_row) 

1128 len_new_rows = len(new_rows) 

1129 

1130 except StopIteration: 

1131 len_new_rows = len(new_rows) 

1132 new_rows = self._remove_skipped_rows(new_rows) 

1133 lines.extend(new_rows) 

1134 if len(lines) == 0: 

1135 raise 

1136 self.pos += len_new_rows 

1137 

1138 self.buf = [] 

1139 else: 

1140 lines = new_rows 

1141 

1142 if self.skipfooter: 

1143 lines = lines[: -self.skipfooter] 

1144 

1145 lines = self._check_comments(lines) 

1146 if self.skip_blank_lines: 

1147 lines = self._remove_empty_lines(lines) 

1148 lines = self._check_thousands(lines) 

1149 return self._check_decimal(lines) 

1150 

1151 def _remove_skipped_rows(self, new_rows: list[list[Scalar]]) -> list[list[Scalar]]: 

1152 if self.skiprows: 

1153 return [ 

1154 row for i, row in enumerate(new_rows) if not self.skipfunc(i + self.pos) 

1155 ] 

1156 return new_rows 

1157 

1158 

1159class FixedWidthReader(abc.Iterator): 

1160 """ 

1161 A reader of fixed-width lines. 

1162 """ 

1163 

1164 def __init__( 

1165 self, 

1166 f: IO[str] | ReadCsvBuffer[str], 

1167 colspecs: list[tuple[int, int]] | Literal["infer"], 

1168 delimiter: str | None, 

1169 comment: str | None, 

1170 skiprows: set[int] | None = None, 

1171 infer_nrows: int = 100, 

1172 ) -> None: 

1173 self.f = f 

1174 self.buffer: Iterator | None = None 

1175 self.delimiter = "\r\n" + delimiter if delimiter else "\n\r\t " 

1176 self.comment = comment 

1177 if colspecs == "infer": 

1178 self.colspecs = self.detect_colspecs( 

1179 infer_nrows=infer_nrows, skiprows=skiprows 

1180 ) 

1181 else: 

1182 self.colspecs = colspecs 

1183 

1184 if not isinstance(self.colspecs, (tuple, list)): 

1185 raise TypeError( 

1186 "column specifications must be a list or tuple, " 

1187 f"input was a {type(colspecs).__name__}" 

1188 ) 

1189 

1190 for colspec in self.colspecs: 

1191 if not ( 

1192 isinstance(colspec, (tuple, list)) 

1193 and len(colspec) == 2 

1194 and isinstance(colspec[0], (int, np.integer, type(None))) 

1195 and isinstance(colspec[1], (int, np.integer, type(None))) 

1196 ): 

1197 raise TypeError( 

1198 "Each column specification must be " 

1199 "2 element tuple or list of integers" 

1200 ) 

1201 

1202 def get_rows(self, infer_nrows: int, skiprows: set[int] | None = None) -> list[str]: 

1203 """ 

1204 Read rows from self.f, skipping as specified. 

1205 

1206 We distinguish buffer_rows (the first <= infer_nrows 

1207 lines) from the rows returned to detect_colspecs 

1208 because it's simpler to leave the other locations 

1209 with skiprows logic alone than to modify them to 

1210 deal with the fact we skipped some rows here as 

1211 well. 

1212 

1213 Parameters 

1214 ---------- 

1215 infer_nrows : int 

1216 Number of rows to read from self.f, not counting 

1217 rows that are skipped. 

1218 skiprows: set, optional 

1219 Indices of rows to skip. 

1220 

1221 Returns 

1222 ------- 

1223 detect_rows : list of str 

1224 A list containing the rows to read. 

1225 

1226 """ 

1227 if skiprows is None: 

1228 skiprows = set() 

1229 buffer_rows = [] 

1230 detect_rows = [] 

1231 for i, row in enumerate(self.f): 

1232 if i not in skiprows: 

1233 detect_rows.append(row) 

1234 buffer_rows.append(row) 

1235 if len(detect_rows) >= infer_nrows: 

1236 break 

1237 self.buffer = iter(buffer_rows) 

1238 return detect_rows 

1239 

1240 def detect_colspecs( 

1241 self, infer_nrows: int = 100, skiprows: set[int] | None = None 

1242 ) -> list[tuple[int, int]]: 

1243 # Regex escape the delimiters 

1244 delimiters = "".join([rf"\{x}" for x in self.delimiter]) 

1245 pattern = re.compile(f"([^{delimiters}]+)") 

1246 rows = self.get_rows(infer_nrows, skiprows) 

1247 if not rows: 

1248 raise EmptyDataError("No rows from which to infer column width") 

1249 max_len = max(map(len, rows)) 

1250 mask = np.zeros(max_len + 1, dtype=int) 

1251 if self.comment is not None: 

1252 rows = [row.partition(self.comment)[0] for row in rows] 

1253 for row in rows: 

1254 for m in pattern.finditer(row): 

1255 mask[m.start() : m.end()] = 1 

1256 shifted = np.roll(mask, 1) 

1257 shifted[0] = 0 

1258 edges = np.where((mask ^ shifted) == 1)[0] 

1259 edge_pairs = list(zip(edges[::2], edges[1::2])) 

1260 return edge_pairs 

1261 

1262 def __next__(self) -> list[str]: 

1263 # Argument 1 to "next" has incompatible type "Union[IO[str], 

1264 # ReadCsvBuffer[str]]"; expected "SupportsNext[str]" 

1265 if self.buffer is not None: 

1266 try: 

1267 line = next(self.buffer) 

1268 except StopIteration: 

1269 self.buffer = None 

1270 line = next(self.f) # type: ignore[arg-type] 

1271 else: 

1272 line = next(self.f) # type: ignore[arg-type] 

1273 # Note: 'colspecs' is a sequence of half-open intervals. 

1274 return [line[fromm:to].strip(self.delimiter) for (fromm, to) in self.colspecs] 

1275 

1276 

1277class FixedWidthFieldParser(PythonParser): 

1278 """ 

1279 Specialization that Converts fixed-width fields into DataFrames. 

1280 See PythonParser for details. 

1281 """ 

1282 

1283 def __init__(self, f: ReadCsvBuffer[str], **kwds) -> None: 

1284 # Support iterators, convert to a list. 

1285 self.colspecs = kwds.pop("colspecs") 

1286 self.infer_nrows = kwds.pop("infer_nrows") 

1287 PythonParser.__init__(self, f, **kwds) 

1288 

1289 def _make_reader(self, f: IO[str] | ReadCsvBuffer[str]) -> None: 

1290 self.data = FixedWidthReader( 

1291 f, 

1292 self.colspecs, 

1293 self.delimiter, 

1294 self.comment, 

1295 self.skiprows, 

1296 self.infer_nrows, 

1297 ) 

1298 

1299 def _remove_empty_lines(self, lines: list[list[Scalar]]) -> list[list[Scalar]]: 

1300 """ 

1301 Returns the list of lines without the empty ones. With fixed-width 

1302 fields, empty lines become arrays of empty strings. 

1303 

1304 See PythonParser._remove_empty_lines. 

1305 """ 

1306 return [ 

1307 line 

1308 for line in lines 

1309 if any(not isinstance(e, str) or e.strip() for e in line) 

1310 ] 

1311 

1312 

1313def count_empty_vals(vals) -> int: 

1314 return sum(1 for v in vals if v == "" or v is None) 

1315 

1316 

1317def _validate_skipfooter_arg(skipfooter: int) -> int: 

1318 """ 

1319 Validate the 'skipfooter' parameter. 

1320 

1321 Checks whether 'skipfooter' is a non-negative integer. 

1322 Raises a ValueError if that is not the case. 

1323 

1324 Parameters 

1325 ---------- 

1326 skipfooter : non-negative integer 

1327 The number of rows to skip at the end of the file. 

1328 

1329 Returns 

1330 ------- 

1331 validated_skipfooter : non-negative integer 

1332 The original input if the validation succeeds. 

1333 

1334 Raises 

1335 ------ 

1336 ValueError : 'skipfooter' was not a non-negative integer. 

1337 """ 

1338 if not is_integer(skipfooter): 

1339 raise ValueError("skipfooter must be an integer") 

1340 

1341 if skipfooter < 0: 

1342 raise ValueError("skipfooter cannot be negative") 

1343 

1344 return skipfooter