Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/indexers/objects.py: 27%
130 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""Indexer objects for computing start/end window bounds for rolling operations"""
2from __future__ import annotations
4from datetime import timedelta
6import numpy as np
8from pandas._libs.window.indexers import calculate_variable_window_bounds
9from pandas.util._decorators import Appender
11from pandas.core.dtypes.common import ensure_platform_int
13from pandas.tseries.offsets import Nano
15get_window_bounds_doc = """
16Computes the bounds of a window.
18Parameters
19----------
20num_values : int, default 0
21 number of values that will be aggregated over
22window_size : int, default 0
23 the number of rows in a window
24min_periods : int, default None
25 min_periods passed from the top level rolling API
26center : bool, default None
27 center passed from the top level rolling API
28closed : str, default None
29 closed passed from the top level rolling API
30step : int, default None
31 step passed from the top level rolling API
32 .. versionadded:: 1.5
33win_type : str, default None
34 win_type passed from the top level rolling API
36Returns
37-------
38A tuple of ndarray[int64]s, indicating the boundaries of each
39window
40"""
43class BaseIndexer:
44 """Base class for window bounds calculations."""
46 def __init__(
47 self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs
48 ) -> None:
49 """
50 Parameters
51 ----------
52 **kwargs :
53 keyword arguments that will be available when get_window_bounds is called
54 """
55 self.index_array = index_array
56 self.window_size = window_size
57 # Set user defined kwargs as attributes that can be used in get_window_bounds
58 for key, value in kwargs.items():
59 setattr(self, key, value)
61 @Appender(get_window_bounds_doc)
62 def get_window_bounds(
63 self,
64 num_values: int = 0,
65 min_periods: int | None = None,
66 center: bool | None = None,
67 closed: str | None = None,
68 step: int | None = None,
69 ) -> tuple[np.ndarray, np.ndarray]:
71 raise NotImplementedError
74class FixedWindowIndexer(BaseIndexer):
75 """Creates window boundaries that are of fixed length."""
77 @Appender(get_window_bounds_doc)
78 def get_window_bounds(
79 self,
80 num_values: int = 0,
81 min_periods: int | None = None,
82 center: bool | None = None,
83 closed: str | None = None,
84 step: int | None = None,
85 ) -> tuple[np.ndarray, np.ndarray]:
87 if center:
88 offset = (self.window_size - 1) // 2
89 else:
90 offset = 0
92 end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64")
93 start = end - self.window_size
94 if closed in ["left", "both"]:
95 start -= 1
96 if closed in ["left", "neither"]:
97 end -= 1
99 end = np.clip(end, 0, num_values)
100 start = np.clip(start, 0, num_values)
102 return start, end
105class VariableWindowIndexer(BaseIndexer):
106 """Creates window boundaries that are of variable length, namely for time series."""
108 @Appender(get_window_bounds_doc)
109 def get_window_bounds(
110 self,
111 num_values: int = 0,
112 min_periods: int | None = None,
113 center: bool | None = None,
114 closed: str | None = None,
115 step: int | None = None,
116 ) -> tuple[np.ndarray, np.ndarray]:
118 # error: Argument 4 to "calculate_variable_window_bounds" has incompatible
119 # type "Optional[bool]"; expected "bool"
120 # error: Argument 6 to "calculate_variable_window_bounds" has incompatible
121 # type "Optional[ndarray]"; expected "ndarray"
122 return calculate_variable_window_bounds(
123 num_values,
124 self.window_size,
125 min_periods,
126 center, # type: ignore[arg-type]
127 closed,
128 self.index_array, # type: ignore[arg-type]
129 )
132class VariableOffsetWindowIndexer(BaseIndexer):
133 """Calculate window boundaries based on a non-fixed offset such as a BusinessDay."""
135 def __init__(
136 self,
137 index_array: np.ndarray | None = None,
138 window_size: int = 0,
139 index=None,
140 offset=None,
141 **kwargs,
142 ) -> None:
143 super().__init__(index_array, window_size, **kwargs)
144 self.index = index
145 self.offset = offset
147 @Appender(get_window_bounds_doc)
148 def get_window_bounds(
149 self,
150 num_values: int = 0,
151 min_periods: int | None = None,
152 center: bool | None = None,
153 closed: str | None = None,
154 step: int | None = None,
155 ) -> tuple[np.ndarray, np.ndarray]:
157 if step is not None:
158 raise NotImplementedError("step not implemented for variable offset window")
159 if num_values <= 0:
160 return np.empty(0, dtype="int64"), np.empty(0, dtype="int64")
162 # if windows is variable, default is 'right', otherwise default is 'both'
163 if closed is None:
164 closed = "right" if self.index is not None else "both"
166 right_closed = closed in ["right", "both"]
167 left_closed = closed in ["left", "both"]
169 if self.index[num_values - 1] < self.index[0]:
170 index_growth_sign = -1
171 else:
172 index_growth_sign = 1
174 start = np.empty(num_values, dtype="int64")
175 start.fill(-1)
176 end = np.empty(num_values, dtype="int64")
177 end.fill(-1)
179 start[0] = 0
181 # right endpoint is closed
182 if right_closed:
183 end[0] = 1
184 # right endpoint is open
185 else:
186 end[0] = 0
188 # start is start of slice interval (including)
189 # end is end of slice interval (not including)
190 for i in range(1, num_values):
191 end_bound = self.index[i]
192 start_bound = self.index[i] - index_growth_sign * self.offset
194 # left endpoint is closed
195 if left_closed:
196 start_bound -= Nano(1)
198 # advance the start bound until we are
199 # within the constraint
200 start[i] = i
201 for j in range(start[i - 1], i):
202 if (self.index[j] - start_bound) * index_growth_sign > timedelta(0):
203 start[i] = j
204 break
206 # end bound is previous end
207 # or current index
208 if (self.index[end[i - 1]] - end_bound) * index_growth_sign <= timedelta(0):
209 end[i] = i + 1
210 else:
211 end[i] = end[i - 1]
213 # right endpoint is open
214 if not right_closed:
215 end[i] -= 1
217 return start, end
220class ExpandingIndexer(BaseIndexer):
221 """Calculate expanding window bounds, mimicking df.expanding()"""
223 @Appender(get_window_bounds_doc)
224 def get_window_bounds(
225 self,
226 num_values: int = 0,
227 min_periods: int | None = None,
228 center: bool | None = None,
229 closed: str | None = None,
230 step: int | None = None,
231 ) -> tuple[np.ndarray, np.ndarray]:
233 return (
234 np.zeros(num_values, dtype=np.int64),
235 np.arange(1, num_values + 1, dtype=np.int64),
236 )
239class FixedForwardWindowIndexer(BaseIndexer):
240 """
241 Creates window boundaries for fixed-length windows that include the current row.
243 Examples
244 --------
245 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]})
246 >>> df
247 B
248 0 0.0
249 1 1.0
250 2 2.0
251 3 NaN
252 4 4.0
254 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2)
255 >>> df.rolling(window=indexer, min_periods=1).sum()
256 B
257 0 1.0
258 1 3.0
259 2 2.0
260 3 4.0
261 4 4.0
262 """
264 @Appender(get_window_bounds_doc)
265 def get_window_bounds(
266 self,
267 num_values: int = 0,
268 min_periods: int | None = None,
269 center: bool | None = None,
270 closed: str | None = None,
271 step: int | None = None,
272 ) -> tuple[np.ndarray, np.ndarray]:
274 if center:
275 raise ValueError("Forward-looking windows can't have center=True")
276 if closed is not None:
277 raise ValueError(
278 "Forward-looking windows don't support setting the closed argument"
279 )
280 if step is None:
281 step = 1
283 start = np.arange(0, num_values, step, dtype="int64")
284 end = start + self.window_size
285 if self.window_size:
286 end = np.clip(end, 0, num_values)
288 return start, end
291class GroupbyIndexer(BaseIndexer):
292 """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""
294 def __init__(
295 self,
296 index_array: np.ndarray | None = None,
297 window_size: int | BaseIndexer = 0,
298 groupby_indices: dict | None = None,
299 window_indexer: type[BaseIndexer] = BaseIndexer,
300 indexer_kwargs: dict | None = None,
301 **kwargs,
302 ) -> None:
303 """
304 Parameters
305 ----------
306 index_array : np.ndarray or None
307 np.ndarray of the index of the original object that we are performing
308 a chained groupby operation over. This index has been pre-sorted relative to
309 the groups
310 window_size : int or BaseIndexer
311 window size during the windowing operation
312 groupby_indices : dict or None
313 dict of {group label: [positional index of rows belonging to the group]}
314 window_indexer : BaseIndexer
315 BaseIndexer class determining the start and end bounds of each group
316 indexer_kwargs : dict or None
317 Custom kwargs to be passed to window_indexer
318 **kwargs :
319 keyword arguments that will be available when get_window_bounds is called
320 """
321 self.groupby_indices = groupby_indices or {}
322 self.window_indexer = window_indexer
323 self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
324 super().__init__(
325 index_array=index_array,
326 window_size=self.indexer_kwargs.pop("window_size", window_size),
327 **kwargs,
328 )
330 @Appender(get_window_bounds_doc)
331 def get_window_bounds(
332 self,
333 num_values: int = 0,
334 min_periods: int | None = None,
335 center: bool | None = None,
336 closed: str | None = None,
337 step: int | None = None,
338 ) -> tuple[np.ndarray, np.ndarray]:
340 # 1) For each group, get the indices that belong to the group
341 # 2) Use the indices to calculate the start & end bounds of the window
342 # 3) Append the window bounds in group order
343 start_arrays = []
344 end_arrays = []
345 window_indices_start = 0
346 for key, indices in self.groupby_indices.items():
347 index_array: np.ndarray | None
349 if self.index_array is not None:
350 index_array = self.index_array.take(ensure_platform_int(indices))
351 else:
352 index_array = self.index_array
353 indexer = self.window_indexer(
354 index_array=index_array,
355 window_size=self.window_size,
356 **self.indexer_kwargs,
357 )
358 start, end = indexer.get_window_bounds(
359 len(indices), min_periods, center, closed, step
360 )
361 start = start.astype(np.int64)
362 end = end.astype(np.int64)
363 assert len(start) == len(
364 end
365 ), "these should be equal in length from get_window_bounds"
366 # Cannot use groupby_indices as they might not be monotonic with the object
367 # we're rolling over
368 window_indices = np.arange(
369 window_indices_start, window_indices_start + len(indices)
370 )
371 window_indices_start += len(indices)
372 # Extend as we'll be slicing window like [start, end)
373 window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
374 np.int64, copy=False
375 )
376 start_arrays.append(window_indices.take(ensure_platform_int(start)))
377 end_arrays.append(window_indices.take(ensure_platform_int(end)))
378 if len(start_arrays) == 0:
379 return np.array([], dtype=np.int64), np.array([], dtype=np.int64)
380 start = np.concatenate(start_arrays)
381 end = np.concatenate(end_arrays)
382 return start, end
385class ExponentialMovingWindowIndexer(BaseIndexer):
386 """Calculate ewm window bounds (the entire window)"""
388 @Appender(get_window_bounds_doc)
389 def get_window_bounds(
390 self,
391 num_values: int = 0,
392 min_periods: int | None = None,
393 center: bool | None = None,
394 closed: str | None = None,
395 step: int | None = None,
396 ) -> tuple[np.ndarray, np.ndarray]:
398 return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)