Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/parsers/arrow_parser_wrapper.py: 16%

65 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3from typing import TYPE_CHECKING 

4 

5from pandas._typing import ReadBuffer 

6from pandas.compat._optional import import_optional_dependency 

7 

8from pandas.core.dtypes.inference import is_integer 

9 

10from pandas.io.parsers.base_parser import ParserBase 

11 

12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13, because the condition on line 12 was never true

13 from pandas import DataFrame 

14 

15 

16class ArrowParserWrapper(ParserBase): 

17 """ 

18 Wrapper for the pyarrow engine for read_csv() 

19 """ 

20 

21 def __init__(self, src: ReadBuffer[bytes], **kwds) -> None: 

22 super().__init__(kwds) 

23 self.kwds = kwds 

24 self.src = src 

25 

26 self._parse_kwds() 

27 

28 def _parse_kwds(self): 

29 """ 

30 Validates keywords before passing to pyarrow. 

31 """ 

32 encoding: str | None = self.kwds.get("encoding") 

33 self.encoding = "utf-8" if encoding is None else encoding 

34 

35 self.usecols, self.usecols_dtype = self._validate_usecols_arg( 

36 self.kwds["usecols"] 

37 ) 

38 na_values = self.kwds["na_values"] 

39 if isinstance(na_values, dict): 

40 raise ValueError( 

41 "The pyarrow engine doesn't support passing a dict for na_values" 

42 ) 

43 self.na_values = list(self.kwds["na_values"]) 

44 

45 def _get_pyarrow_options(self): 

46 """ 

47 Rename some arguments to pass to pyarrow 

48 """ 

49 mapping = { 

50 "usecols": "include_columns", 

51 "na_values": "null_values", 

52 "escapechar": "escape_char", 

53 "skip_blank_lines": "ignore_empty_lines", 

54 } 

55 for pandas_name, pyarrow_name in mapping.items(): 

56 if pandas_name in self.kwds and self.kwds.get(pandas_name) is not None: 

57 self.kwds[pyarrow_name] = self.kwds.pop(pandas_name) 

58 

59 self.parse_options = { 

60 option_name: option_value 

61 for option_name, option_value in self.kwds.items() 

62 if option_value is not None 

63 and option_name 

64 in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines") 

65 } 

66 self.convert_options = { 

67 option_name: option_value 

68 for option_name, option_value in self.kwds.items() 

69 if option_value is not None 

70 and option_name 

71 in ("include_columns", "null_values", "true_values", "false_values") 

72 } 

73 self.read_options = { 

74 "autogenerate_column_names": self.header is None, 

75 "skip_rows": self.header 

76 if self.header is not None 

77 else self.kwds["skiprows"], 

78 } 

79 

80 def _finalize_output(self, frame: DataFrame) -> DataFrame: 

81 """ 

82 Processes data read in based on kwargs. 

83 

84 Parameters 

85 ---------- 

86 frame: DataFrame 

87 The DataFrame to process. 

88 

89 Returns 

90 ------- 

91 DataFrame 

92 The processed DataFrame. 

93 """ 

94 num_cols = len(frame.columns) 

95 multi_index_named = True 

96 if self.header is None: 

97 if self.names is None: 

98 if self.prefix is not None: 

99 self.names = [f"{self.prefix}{i}" for i in range(num_cols)] 

100 elif self.header is None: 

101 self.names = range(num_cols) 

102 if len(self.names) != num_cols: 

103 # usecols is passed through to pyarrow, we only handle index col here 

104 # The only way self.names is not the same length as number of cols is 

105 # if we have int index_col. We should just pad the names(they will get 

106 # removed anyways) to expected length then. 

107 self.names = list(range(num_cols - len(self.names))) + self.names 

108 multi_index_named = False 

109 frame.columns = self.names 

110 # we only need the frame not the names 

111 frame.columns, frame = self._do_date_conversions(frame.columns, frame) 

112 if self.index_col is not None: 

113 for i, item in enumerate(self.index_col): 

114 if is_integer(item): 

115 self.index_col[i] = frame.columns[item] 

116 else: 

117 # String case 

118 if item not in frame.columns: 

119 raise ValueError(f"Index {item} invalid") 

120 frame.set_index(self.index_col, drop=True, inplace=True) 

121 # Clear names if headerless and no name given 

122 if self.header is None and not multi_index_named: 

123 frame.index.names = [None] * len(frame.index.names) 

124 

125 if self.kwds.get("dtype") is not None: 

126 try: 

127 frame = frame.astype(self.kwds.get("dtype")) 

128 except TypeError as e: 

129 # GH#44901 reraise to keep api consistent 

130 raise ValueError(e) 

131 return frame 

132 

133 def read(self) -> DataFrame: 

134 """ 

135 Reads the contents of a CSV file into a DataFrame and 

136 processes it according to the kwargs passed in the 

137 constructor. 

138 

139 Returns 

140 ------- 

141 DataFrame 

142 The DataFrame created from the CSV file. 

143 """ 

144 pyarrow_csv = import_optional_dependency("pyarrow.csv") 

145 self._get_pyarrow_options() 

146 

147 table = pyarrow_csv.read_csv( 

148 self.src, 

149 read_options=pyarrow_csv.ReadOptions(**self.read_options), 

150 parse_options=pyarrow_csv.ParseOptions(**self.parse_options), 

151 convert_options=pyarrow_csv.ConvertOptions(**self.convert_options), 

152 ) 

153 

154 frame = table.to_pandas() 

155 return self._finalize_output(frame)