Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/excel/_odfreader.py: 15%
142 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3from typing import (
4 TYPE_CHECKING,
5 cast,
6)
8import numpy as np
10from pandas._typing import (
11 FilePath,
12 ReadBuffer,
13 Scalar,
14 StorageOptions,
15)
16from pandas.compat._optional import import_optional_dependency
17from pandas.util._decorators import doc
19import pandas as pd
20from pandas.core.shared_docs import _shared_docs
22from pandas.io.excel._base import BaseExcelReader
24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25, because the condition on line 24 was never true
25 from pandas._libs.tslibs.nattype import NaTType
28@doc(storage_options=_shared_docs["storage_options"])
29class ODFReader(BaseExcelReader):
30 """
31 Read tables out of OpenDocument formatted files.
33 Parameters
34 ----------
35 filepath_or_buffer : str, path to be parsed or
36 an open readable stream.
37 {storage_options}
38 """
40 def __init__(
41 self,
42 filepath_or_buffer: FilePath | ReadBuffer[bytes],
43 storage_options: StorageOptions = None,
44 ) -> None:
45 import_optional_dependency("odf")
46 super().__init__(filepath_or_buffer, storage_options=storage_options)
48 @property
49 def _workbook_class(self):
50 from odf.opendocument import OpenDocument
52 return OpenDocument
54 def load_workbook(self, filepath_or_buffer: FilePath | ReadBuffer[bytes]):
55 from odf.opendocument import load
57 return load(filepath_or_buffer)
59 @property
60 def empty_value(self) -> str:
61 """Property for compat with other readers."""
62 return ""
64 @property
65 def sheet_names(self) -> list[str]:
66 """Return a list of sheet names present in the document"""
67 from odf.table import Table
69 tables = self.book.getElementsByType(Table)
70 return [t.getAttribute("name") for t in tables]
72 def get_sheet_by_index(self, index: int):
73 from odf.table import Table
75 self.raise_if_bad_sheet_by_index(index)
76 tables = self.book.getElementsByType(Table)
77 return tables[index]
79 def get_sheet_by_name(self, name: str):
80 from odf.table import Table
82 self.raise_if_bad_sheet_by_name(name)
83 tables = self.book.getElementsByType(Table)
85 for table in tables:
86 if table.getAttribute("name") == name:
87 return table
89 self.close()
90 raise ValueError(f"sheet {name} not found")
92 def get_sheet_data(
93 self, sheet, convert_float: bool, file_rows_needed: int | None = None
94 ) -> list[list[Scalar | NaTType]]:
95 """
96 Parse an ODF Table into a list of lists
97 """
98 from odf.table import (
99 CoveredTableCell,
100 TableCell,
101 TableRow,
102 )
104 covered_cell_name = CoveredTableCell().qname
105 table_cell_name = TableCell().qname
106 cell_names = {covered_cell_name, table_cell_name}
108 sheet_rows = sheet.getElementsByType(TableRow)
109 empty_rows = 0
110 max_row_len = 0
112 table: list[list[Scalar | NaTType]] = []
114 for sheet_row in sheet_rows:
115 sheet_cells = [
116 x
117 for x in sheet_row.childNodes
118 if hasattr(x, "qname") and x.qname in cell_names
119 ]
120 empty_cells = 0
121 table_row: list[Scalar | NaTType] = []
123 for sheet_cell in sheet_cells:
124 if sheet_cell.qname == table_cell_name:
125 value = self._get_cell_value(sheet_cell, convert_float)
126 else:
127 value = self.empty_value
129 column_repeat = self._get_column_repeat(sheet_cell)
131 # Queue up empty values, writing only if content succeeds them
132 if value == self.empty_value:
133 empty_cells += column_repeat
134 else:
135 table_row.extend([self.empty_value] * empty_cells)
136 empty_cells = 0
137 table_row.extend([value] * column_repeat)
139 if max_row_len < len(table_row):
140 max_row_len = len(table_row)
142 row_repeat = self._get_row_repeat(sheet_row)
143 if self._is_empty_row(sheet_row):
144 empty_rows += row_repeat
145 else:
146 # add blank rows to our table
147 table.extend([[self.empty_value]] * empty_rows)
148 empty_rows = 0
149 for _ in range(row_repeat):
150 table.append(table_row)
151 if file_rows_needed is not None and len(table) >= file_rows_needed:
152 break
154 # Make our table square
155 for row in table:
156 if len(row) < max_row_len:
157 row.extend([self.empty_value] * (max_row_len - len(row)))
159 return table
161 def _get_row_repeat(self, row) -> int:
162 """
163 Return number of times this row was repeated
164 Repeating an empty row appeared to be a common way
165 of representing sparse rows in the table.
166 """
167 from odf.namespaces import TABLENS
169 return int(row.attributes.get((TABLENS, "number-rows-repeated"), 1))
171 def _get_column_repeat(self, cell) -> int:
172 from odf.namespaces import TABLENS
174 return int(cell.attributes.get((TABLENS, "number-columns-repeated"), 1))
176 def _is_empty_row(self, row) -> bool:
177 """
178 Helper function to find empty rows
179 """
180 for column in row.childNodes:
181 if len(column.childNodes) > 0:
182 return False
184 return True
186 def _get_cell_value(self, cell, convert_float: bool) -> Scalar | NaTType:
187 from odf.namespaces import OFFICENS
189 if str(cell) == "#N/A":
190 return np.nan
192 cell_type = cell.attributes.get((OFFICENS, "value-type"))
193 if cell_type == "boolean":
194 if str(cell) == "TRUE":
195 return True
196 return False
197 if cell_type is None:
198 return self.empty_value
199 elif cell_type == "float":
200 # GH5394
201 cell_value = float(cell.attributes.get((OFFICENS, "value")))
202 if convert_float:
203 val = int(cell_value)
204 if val == cell_value:
205 return val
206 return cell_value
207 elif cell_type == "percentage":
208 cell_value = cell.attributes.get((OFFICENS, "value"))
209 return float(cell_value)
210 elif cell_type == "string":
211 return self._get_cell_string_value(cell)
212 elif cell_type == "currency":
213 cell_value = cell.attributes.get((OFFICENS, "value"))
214 return float(cell_value)
215 elif cell_type == "date":
216 cell_value = cell.attributes.get((OFFICENS, "date-value"))
217 return pd.to_datetime(cell_value)
218 elif cell_type == "time":
219 stamp = pd.to_datetime(str(cell))
220 # cast needed here because Scalar doesn't include datetime.time
221 return cast(Scalar, stamp.time())
222 else:
223 self.close()
224 raise ValueError(f"Unrecognized type {cell_type}")
226 def _get_cell_string_value(self, cell) -> str:
227 """
228 Find and decode OpenDocument text:s tags that represent
229 a run length encoded sequence of space characters.
230 """
231 from odf.element import Element
232 from odf.namespaces import TEXTNS
233 from odf.text import S
235 text_s = S().qname
237 value = []
239 for fragment in cell.childNodes:
240 if isinstance(fragment, Element):
241 if fragment.qname == text_s:
242 spaces = int(fragment.attributes.get((TEXTNS, "c"), 1))
243 value.append(" " * spaces)
244 else:
245 # recursive impl needed in case of nested fragments
246 # with multiple spaces
247 # https://github.com/pandas-dev/pandas/pull/36175#discussion_r484639704
248 value.append(self._get_cell_string_value(fragment))
249 else:
250 value.append(str(fragment).strip("\n"))
251 return "".join(value)