Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/excel/_odfreader.py: 15%

142 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3from typing import ( 

4 TYPE_CHECKING, 

5 cast, 

6) 

7 

8import numpy as np 

9 

10from pandas._typing import ( 

11 FilePath, 

12 ReadBuffer, 

13 Scalar, 

14 StorageOptions, 

15) 

16from pandas.compat._optional import import_optional_dependency 

17from pandas.util._decorators import doc 

18 

19import pandas as pd 

20from pandas.core.shared_docs import _shared_docs 

21 

22from pandas.io.excel._base import BaseExcelReader 

23 

24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25, because the condition on line 24 was never true

25 from pandas._libs.tslibs.nattype import NaTType 

26 

27 

28@doc(storage_options=_shared_docs["storage_options"]) 

29class ODFReader(BaseExcelReader): 

30 """ 

31 Read tables out of OpenDocument formatted files. 

32 

33 Parameters 

34 ---------- 

35 filepath_or_buffer : str, path to be parsed or 

36 an open readable stream. 

37 {storage_options} 

38 """ 

39 

40 def __init__( 

41 self, 

42 filepath_or_buffer: FilePath | ReadBuffer[bytes], 

43 storage_options: StorageOptions = None, 

44 ) -> None: 

45 import_optional_dependency("odf") 

46 super().__init__(filepath_or_buffer, storage_options=storage_options) 

47 

48 @property 

49 def _workbook_class(self): 

50 from odf.opendocument import OpenDocument 

51 

52 return OpenDocument 

53 

54 def load_workbook(self, filepath_or_buffer: FilePath | ReadBuffer[bytes]): 

55 from odf.opendocument import load 

56 

57 return load(filepath_or_buffer) 

58 

59 @property 

60 def empty_value(self) -> str: 

61 """Property for compat with other readers.""" 

62 return "" 

63 

64 @property 

65 def sheet_names(self) -> list[str]: 

66 """Return a list of sheet names present in the document""" 

67 from odf.table import Table 

68 

69 tables = self.book.getElementsByType(Table) 

70 return [t.getAttribute("name") for t in tables] 

71 

72 def get_sheet_by_index(self, index: int): 

73 from odf.table import Table 

74 

75 self.raise_if_bad_sheet_by_index(index) 

76 tables = self.book.getElementsByType(Table) 

77 return tables[index] 

78 

79 def get_sheet_by_name(self, name: str): 

80 from odf.table import Table 

81 

82 self.raise_if_bad_sheet_by_name(name) 

83 tables = self.book.getElementsByType(Table) 

84 

85 for table in tables: 

86 if table.getAttribute("name") == name: 

87 return table 

88 

89 self.close() 

90 raise ValueError(f"sheet {name} not found") 

91 

92 def get_sheet_data( 

93 self, sheet, convert_float: bool, file_rows_needed: int | None = None 

94 ) -> list[list[Scalar | NaTType]]: 

95 """ 

96 Parse an ODF Table into a list of lists 

97 """ 

98 from odf.table import ( 

99 CoveredTableCell, 

100 TableCell, 

101 TableRow, 

102 ) 

103 

104 covered_cell_name = CoveredTableCell().qname 

105 table_cell_name = TableCell().qname 

106 cell_names = {covered_cell_name, table_cell_name} 

107 

108 sheet_rows = sheet.getElementsByType(TableRow) 

109 empty_rows = 0 

110 max_row_len = 0 

111 

112 table: list[list[Scalar | NaTType]] = [] 

113 

114 for sheet_row in sheet_rows: 

115 sheet_cells = [ 

116 x 

117 for x in sheet_row.childNodes 

118 if hasattr(x, "qname") and x.qname in cell_names 

119 ] 

120 empty_cells = 0 

121 table_row: list[Scalar | NaTType] = [] 

122 

123 for sheet_cell in sheet_cells: 

124 if sheet_cell.qname == table_cell_name: 

125 value = self._get_cell_value(sheet_cell, convert_float) 

126 else: 

127 value = self.empty_value 

128 

129 column_repeat = self._get_column_repeat(sheet_cell) 

130 

131 # Queue up empty values, writing only if content succeeds them 

132 if value == self.empty_value: 

133 empty_cells += column_repeat 

134 else: 

135 table_row.extend([self.empty_value] * empty_cells) 

136 empty_cells = 0 

137 table_row.extend([value] * column_repeat) 

138 

139 if max_row_len < len(table_row): 

140 max_row_len = len(table_row) 

141 

142 row_repeat = self._get_row_repeat(sheet_row) 

143 if self._is_empty_row(sheet_row): 

144 empty_rows += row_repeat 

145 else: 

146 # add blank rows to our table 

147 table.extend([[self.empty_value]] * empty_rows) 

148 empty_rows = 0 

149 for _ in range(row_repeat): 

150 table.append(table_row) 

151 if file_rows_needed is not None and len(table) >= file_rows_needed: 

152 break 

153 

154 # Make our table square 

155 for row in table: 

156 if len(row) < max_row_len: 

157 row.extend([self.empty_value] * (max_row_len - len(row))) 

158 

159 return table 

160 

161 def _get_row_repeat(self, row) -> int: 

162 """ 

163 Return number of times this row was repeated 

164 Repeating an empty row appeared to be a common way 

165 of representing sparse rows in the table. 

166 """ 

167 from odf.namespaces import TABLENS 

168 

169 return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1)) 

170 

171 def _get_column_repeat(self, cell) -> int: 

172 from odf.namespaces import TABLENS 

173 

174 return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1)) 

175 

176 def _is_empty_row(self, row) -> bool: 

177 """ 

178 Helper function to find empty rows 

179 """ 

180 for column in row.childNodes: 

181 if len(column.childNodes) > 0: 

182 return False 

183 

184 return True 

185 

186 def _get_cell_value(self, cell, convert_float: bool) -> Scalar | NaTType: 

187 from odf.namespaces import OFFICENS 

188 

189 if str(cell) == "#N/A": 

190 return np.nan 

191 

192 cell_type = cell.attributes.get((OFFICENS, "value-type")) 

193 if cell_type == "boolean": 

194 if str(cell) == "TRUE": 

195 return True 

196 return False 

197 if cell_type is None: 

198 return self.empty_value 

199 elif cell_type == "float": 

200 # GH5394 

201 cell_value = float(cell.attributes.get((OFFICENS, "value"))) 

202 if convert_float: 

203 val = int(cell_value) 

204 if val == cell_value: 

205 return val 

206 return cell_value 

207 elif cell_type == "percentage": 

208 cell_value = cell.attributes.get((OFFICENS, "value")) 

209 return float(cell_value) 

210 elif cell_type == "string": 

211 return self._get_cell_string_value(cell) 

212 elif cell_type == "currency": 

213 cell_value = cell.attributes.get((OFFICENS, "value")) 

214 return float(cell_value) 

215 elif cell_type == "date": 

216 cell_value = cell.attributes.get((OFFICENS, "date-value")) 

217 return pd.to_datetime(cell_value) 

218 elif cell_type == "time": 

219 stamp = pd.to_datetime(str(cell)) 

220 # cast needed here because Scalar doesn't include datetime.time 

221 return cast(Scalar, stamp.time()) 

222 else: 

223 self.close() 

224 raise ValueError(f"Unrecognized type {cell_type}") 

225 

226 def _get_cell_string_value(self, cell) -> str: 

227 """ 

228 Find and decode OpenDocument text:s tags that represent 

229 a run length encoded sequence of space characters. 

230 """ 

231 from odf.element import Element 

232 from odf.namespaces import TEXTNS 

233 from odf.text import S 

234 

235 text_s = S().qname 

236 

237 value = [] 

238 

239 for fragment in cell.childNodes: 

240 if isinstance(fragment, Element): 

241 if fragment.qname == text_s: 

242 spaces = int(fragment.attributes.get((TEXTNS, "c"), 1)) 

243 value.append(" " * spaces) 

244 else: 

245 # recursive impl needed in case of nested fragments 

246 # with multiple spaces 

247 # https://github.com/pandas-dev/pandas/pull/36175#discussion_r484639704 

248 value.append(self._get_cell_string_value(fragment)) 

249 else: 

250 value.append(str(fragment).strip("\n")) 

251 return "".join(value)