Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/indexers/objects.py: 27%

130 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Indexer objects for computing start/end window bounds for rolling operations""" 

2from __future__ import annotations 

3 

4from datetime import timedelta 

5 

6import numpy as np 

7 

8from pandas._libs.window.indexers import calculate_variable_window_bounds 

9from pandas.util._decorators import Appender 

10 

11from pandas.core.dtypes.common import ensure_platform_int 

12 

13from pandas.tseries.offsets import Nano 

14 

15get_window_bounds_doc = """ 

16Computes the bounds of a window. 

17 

18Parameters 

19---------- 

20num_values : int, default 0 

21 number of values that will be aggregated over 

22window_size : int, default 0 

23 the number of rows in a window 

24min_periods : int, default None 

25 min_periods passed from the top level rolling API 

26center : bool, default None 

27 center passed from the top level rolling API 

28closed : str, default None 

29 closed passed from the top level rolling API 

30step : int, default None 

31 step passed from the top level rolling API 

32 .. versionadded:: 1.5 

33win_type : str, default None 

34 win_type passed from the top level rolling API 

35 

36Returns 

37------- 

38A tuple of ndarray[int64]s, indicating the boundaries of each 

39window 

40""" 

41 

42 

43class BaseIndexer: 

44 """Base class for window bounds calculations.""" 

45 

46 def __init__( 

47 self, index_array: np.ndarray | None = None, window_size: int = 0, **kwargs 

48 ) -> None: 

49 """ 

50 Parameters 

51 ---------- 

52 **kwargs : 

53 keyword arguments that will be available when get_window_bounds is called 

54 """ 

55 self.index_array = index_array 

56 self.window_size = window_size 

57 # Set user defined kwargs as attributes that can be used in get_window_bounds 

58 for key, value in kwargs.items(): 

59 setattr(self, key, value) 

60 

61 @Appender(get_window_bounds_doc) 

62 def get_window_bounds( 

63 self, 

64 num_values: int = 0, 

65 min_periods: int | None = None, 

66 center: bool | None = None, 

67 closed: str | None = None, 

68 step: int | None = None, 

69 ) -> tuple[np.ndarray, np.ndarray]: 

70 

71 raise NotImplementedError 

72 

73 

74class FixedWindowIndexer(BaseIndexer): 

75 """Creates window boundaries that are of fixed length.""" 

76 

77 @Appender(get_window_bounds_doc) 

78 def get_window_bounds( 

79 self, 

80 num_values: int = 0, 

81 min_periods: int | None = None, 

82 center: bool | None = None, 

83 closed: str | None = None, 

84 step: int | None = None, 

85 ) -> tuple[np.ndarray, np.ndarray]: 

86 

87 if center: 

88 offset = (self.window_size - 1) // 2 

89 else: 

90 offset = 0 

91 

92 end = np.arange(1 + offset, num_values + 1 + offset, step, dtype="int64") 

93 start = end - self.window_size 

94 if closed in ["left", "both"]: 

95 start -= 1 

96 if closed in ["left", "neither"]: 

97 end -= 1 

98 

99 end = np.clip(end, 0, num_values) 

100 start = np.clip(start, 0, num_values) 

101 

102 return start, end 

103 

104 

105class VariableWindowIndexer(BaseIndexer): 

106 """Creates window boundaries that are of variable length, namely for time series.""" 

107 

108 @Appender(get_window_bounds_doc) 

109 def get_window_bounds( 

110 self, 

111 num_values: int = 0, 

112 min_periods: int | None = None, 

113 center: bool | None = None, 

114 closed: str | None = None, 

115 step: int | None = None, 

116 ) -> tuple[np.ndarray, np.ndarray]: 

117 

118 # error: Argument 4 to "calculate_variable_window_bounds" has incompatible 

119 # type "Optional[bool]"; expected "bool" 

120 # error: Argument 6 to "calculate_variable_window_bounds" has incompatible 

121 # type "Optional[ndarray]"; expected "ndarray" 

122 return calculate_variable_window_bounds( 

123 num_values, 

124 self.window_size, 

125 min_periods, 

126 center, # type: ignore[arg-type] 

127 closed, 

128 self.index_array, # type: ignore[arg-type] 

129 ) 

130 

131 

132class VariableOffsetWindowIndexer(BaseIndexer): 

133 """Calculate window boundaries based on a non-fixed offset such as a BusinessDay.""" 

134 

135 def __init__( 

136 self, 

137 index_array: np.ndarray | None = None, 

138 window_size: int = 0, 

139 index=None, 

140 offset=None, 

141 **kwargs, 

142 ) -> None: 

143 super().__init__(index_array, window_size, **kwargs) 

144 self.index = index 

145 self.offset = offset 

146 

147 @Appender(get_window_bounds_doc) 

148 def get_window_bounds( 

149 self, 

150 num_values: int = 0, 

151 min_periods: int | None = None, 

152 center: bool | None = None, 

153 closed: str | None = None, 

154 step: int | None = None, 

155 ) -> tuple[np.ndarray, np.ndarray]: 

156 

157 if step is not None: 

158 raise NotImplementedError("step not implemented for variable offset window") 

159 if num_values <= 0: 

160 return np.empty(0, dtype="int64"), np.empty(0, dtype="int64") 

161 

162 # if windows is variable, default is 'right', otherwise default is 'both' 

163 if closed is None: 

164 closed = "right" if self.index is not None else "both" 

165 

166 right_closed = closed in ["right", "both"] 

167 left_closed = closed in ["left", "both"] 

168 

169 if self.index[num_values - 1] < self.index[0]: 

170 index_growth_sign = -1 

171 else: 

172 index_growth_sign = 1 

173 

174 start = np.empty(num_values, dtype="int64") 

175 start.fill(-1) 

176 end = np.empty(num_values, dtype="int64") 

177 end.fill(-1) 

178 

179 start[0] = 0 

180 

181 # right endpoint is closed 

182 if right_closed: 

183 end[0] = 1 

184 # right endpoint is open 

185 else: 

186 end[0] = 0 

187 

188 # start is start of slice interval (including) 

189 # end is end of slice interval (not including) 

190 for i in range(1, num_values): 

191 end_bound = self.index[i] 

192 start_bound = self.index[i] - index_growth_sign * self.offset 

193 

194 # left endpoint is closed 

195 if left_closed: 

196 start_bound -= Nano(1) 

197 

198 # advance the start bound until we are 

199 # within the constraint 

200 start[i] = i 

201 for j in range(start[i - 1], i): 

202 if (self.index[j] - start_bound) * index_growth_sign > timedelta(0): 

203 start[i] = j 

204 break 

205 

206 # end bound is previous end 

207 # or current index 

208 if (self.index[end[i - 1]] - end_bound) * index_growth_sign <= timedelta(0): 

209 end[i] = i + 1 

210 else: 

211 end[i] = end[i - 1] 

212 

213 # right endpoint is open 

214 if not right_closed: 

215 end[i] -= 1 

216 

217 return start, end 

218 

219 

220class ExpandingIndexer(BaseIndexer): 

221 """Calculate expanding window bounds, mimicking df.expanding()""" 

222 

223 @Appender(get_window_bounds_doc) 

224 def get_window_bounds( 

225 self, 

226 num_values: int = 0, 

227 min_periods: int | None = None, 

228 center: bool | None = None, 

229 closed: str | None = None, 

230 step: int | None = None, 

231 ) -> tuple[np.ndarray, np.ndarray]: 

232 

233 return ( 

234 np.zeros(num_values, dtype=np.int64), 

235 np.arange(1, num_values + 1, dtype=np.int64), 

236 ) 

237 

238 

239class FixedForwardWindowIndexer(BaseIndexer): 

240 """ 

241 Creates window boundaries for fixed-length windows that include the current row. 

242 

243 Examples 

244 -------- 

245 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]}) 

246 >>> df 

247 B 

248 0 0.0 

249 1 1.0 

250 2 2.0 

251 3 NaN 

252 4 4.0 

253 

254 >>> indexer = pd.api.indexers.FixedForwardWindowIndexer(window_size=2) 

255 >>> df.rolling(window=indexer, min_periods=1).sum() 

256 B 

257 0 1.0 

258 1 3.0 

259 2 2.0 

260 3 4.0 

261 4 4.0 

262 """ 

263 

264 @Appender(get_window_bounds_doc) 

265 def get_window_bounds( 

266 self, 

267 num_values: int = 0, 

268 min_periods: int | None = None, 

269 center: bool | None = None, 

270 closed: str | None = None, 

271 step: int | None = None, 

272 ) -> tuple[np.ndarray, np.ndarray]: 

273 

274 if center: 

275 raise ValueError("Forward-looking windows can't have center=True") 

276 if closed is not None: 

277 raise ValueError( 

278 "Forward-looking windows don't support setting the closed argument" 

279 ) 

280 if step is None: 

281 step = 1 

282 

283 start = np.arange(0, num_values, step, dtype="int64") 

284 end = start + self.window_size 

285 if self.window_size: 

286 end = np.clip(end, 0, num_values) 

287 

288 return start, end 

289 

290 

291class GroupbyIndexer(BaseIndexer): 

292 """Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()""" 

293 

294 def __init__( 

295 self, 

296 index_array: np.ndarray | None = None, 

297 window_size: int | BaseIndexer = 0, 

298 groupby_indices: dict | None = None, 

299 window_indexer: type[BaseIndexer] = BaseIndexer, 

300 indexer_kwargs: dict | None = None, 

301 **kwargs, 

302 ) -> None: 

303 """ 

304 Parameters 

305 ---------- 

306 index_array : np.ndarray or None 

307 np.ndarray of the index of the original object that we are performing 

308 a chained groupby operation over. This index has been pre-sorted relative to 

309 the groups 

310 window_size : int or BaseIndexer 

311 window size during the windowing operation 

312 groupby_indices : dict or None 

313 dict of {group label: [positional index of rows belonging to the group]} 

314 window_indexer : BaseIndexer 

315 BaseIndexer class determining the start and end bounds of each group 

316 indexer_kwargs : dict or None 

317 Custom kwargs to be passed to window_indexer 

318 **kwargs : 

319 keyword arguments that will be available when get_window_bounds is called 

320 """ 

321 self.groupby_indices = groupby_indices or {} 

322 self.window_indexer = window_indexer 

323 self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {} 

324 super().__init__( 

325 index_array=index_array, 

326 window_size=self.indexer_kwargs.pop("window_size", window_size), 

327 **kwargs, 

328 ) 

329 

330 @Appender(get_window_bounds_doc) 

331 def get_window_bounds( 

332 self, 

333 num_values: int = 0, 

334 min_periods: int | None = None, 

335 center: bool | None = None, 

336 closed: str | None = None, 

337 step: int | None = None, 

338 ) -> tuple[np.ndarray, np.ndarray]: 

339 

340 # 1) For each group, get the indices that belong to the group 

341 # 2) Use the indices to calculate the start & end bounds of the window 

342 # 3) Append the window bounds in group order 

343 start_arrays = [] 

344 end_arrays = [] 

345 window_indices_start = 0 

346 for key, indices in self.groupby_indices.items(): 

347 index_array: np.ndarray | None 

348 

349 if self.index_array is not None: 

350 index_array = self.index_array.take(ensure_platform_int(indices)) 

351 else: 

352 index_array = self.index_array 

353 indexer = self.window_indexer( 

354 index_array=index_array, 

355 window_size=self.window_size, 

356 **self.indexer_kwargs, 

357 ) 

358 start, end = indexer.get_window_bounds( 

359 len(indices), min_periods, center, closed, step 

360 ) 

361 start = start.astype(np.int64) 

362 end = end.astype(np.int64) 

363 assert len(start) == len( 

364 end 

365 ), "these should be equal in length from get_window_bounds" 

366 # Cannot use groupby_indices as they might not be monotonic with the object 

367 # we're rolling over 

368 window_indices = np.arange( 

369 window_indices_start, window_indices_start + len(indices) 

370 ) 

371 window_indices_start += len(indices) 

372 # Extend as we'll be slicing window like [start, end) 

373 window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype( 

374 np.int64, copy=False 

375 ) 

376 start_arrays.append(window_indices.take(ensure_platform_int(start))) 

377 end_arrays.append(window_indices.take(ensure_platform_int(end))) 

378 if len(start_arrays) == 0: 

379 return np.array([], dtype=np.int64), np.array([], dtype=np.int64) 

380 start = np.concatenate(start_arrays) 

381 end = np.concatenate(end_arrays) 

382 return start, end 

383 

384 

385class ExponentialMovingWindowIndexer(BaseIndexer): 

386 """Calculate ewm window bounds (the entire window)""" 

387 

388 @Appender(get_window_bounds_doc) 

389 def get_window_bounds( 

390 self, 

391 num_values: int = 0, 

392 min_periods: int | None = None, 

393 center: bool | None = None, 

394 closed: str | None = None, 

395 step: int | None = None, 

396 ) -> tuple[np.ndarray, np.ndarray]: 

397 

398 return np.array([0], dtype=np.int64), np.array([num_values], dtype=np.int64)