Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/window/ewm.py: 21%

246 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3import datetime 

4from functools import partial 

5from textwrap import dedent 

6from typing import ( 

7 TYPE_CHECKING, 

8 cast, 

9) 

10import warnings 

11 

12import numpy as np 

13 

14from pandas._libs.tslibs import Timedelta 

15import pandas._libs.window.aggregations as window_aggregations 

16from pandas._typing import ( 

17 Axis, 

18 TimedeltaConvertibleTypes, 

19) 

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22, because the condition on line 21 was never true

22 from pandas import DataFrame, Series 

23 from pandas.core.generic import NDFrame 

24 

25from pandas.compat.numpy import function as nv 

26from pandas.util._decorators import doc 

27from pandas.util._exceptions import find_stack_level 

28 

29from pandas.core.dtypes.common import ( 

30 is_datetime64_ns_dtype, 

31 is_numeric_dtype, 

32) 

33from pandas.core.dtypes.missing import isna 

34 

35import pandas.core.common as common # noqa: PDF018 

36from pandas.core.indexers.objects import ( 

37 BaseIndexer, 

38 ExponentialMovingWindowIndexer, 

39 GroupbyIndexer, 

40) 

41from pandas.core.util.numba_ import ( 

42 get_jit_arguments, 

43 maybe_use_numba, 

44) 

45from pandas.core.window.common import ( 

46 maybe_warn_args_and_kwargs, 

47 zsqrt, 

48) 

49from pandas.core.window.doc import ( 

50 _shared_docs, 

51 args_compat, 

52 create_section_header, 

53 kwargs_compat, 

54 kwargs_numeric_only, 

55 numba_notes, 

56 template_header, 

57 template_returns, 

58 template_see_also, 

59 window_agg_numba_parameters, 

60) 

61from pandas.core.window.numba_ import ( 

62 generate_numba_ewm_func, 

63 generate_numba_ewm_table_func, 

64) 

65from pandas.core.window.online import ( 

66 EWMMeanState, 

67 generate_online_numba_ewma_func, 

68) 

69from pandas.core.window.rolling import ( 

70 BaseWindow, 

71 BaseWindowGroupby, 

72) 

73 

74 

75def get_center_of_mass( 

76 comass: float | None, 

77 span: float | None, 

78 halflife: float | None, 

79 alpha: float | None, 

80) -> float: 

81 valid_count = common.count_not_none(comass, span, halflife, alpha) 

82 if valid_count > 1: 

83 raise ValueError("comass, span, halflife, and alpha are mutually exclusive") 

84 

85 # Convert to center of mass; domain checks ensure 0 < alpha <= 1 

86 if comass is not None: 

87 if comass < 0: 

88 raise ValueError("comass must satisfy: comass >= 0") 

89 elif span is not None: 

90 if span < 1: 

91 raise ValueError("span must satisfy: span >= 1") 

92 comass = (span - 1) / 2 

93 elif halflife is not None: 

94 if halflife <= 0: 

95 raise ValueError("halflife must satisfy: halflife > 0") 

96 decay = 1 - np.exp(np.log(0.5) / halflife) 

97 comass = 1 / decay - 1 

98 elif alpha is not None: 

99 if alpha <= 0 or alpha > 1: 

100 raise ValueError("alpha must satisfy: 0 < alpha <= 1") 

101 comass = (1 - alpha) / alpha 

102 else: 

103 raise ValueError("Must pass one of comass, span, halflife, or alpha") 

104 

105 return float(comass) 

106 

107 

108def _calculate_deltas( 

109 times: str | np.ndarray | NDFrame | None, 

110 halflife: float | TimedeltaConvertibleTypes | None, 

111) -> np.ndarray: 

112 """ 

113 Return the diff of the times divided by the half-life. These values are used in 

114 the calculation of the ewm mean. 

115 

116 Parameters 

117 ---------- 

118 times : str, np.ndarray, Series, default None 

119 Times corresponding to the observations. Must be monotonically increasing 

120 and ``datetime64[ns]`` dtype. 

121 halflife : float, str, timedelta, optional 

122 Half-life specifying the decay 

123 

124 Returns 

125 ------- 

126 np.ndarray 

127 Diff of the times divided by the half-life 

128 """ 

129 # error: Item "str" of "Union[str, ndarray, NDFrameT, None]" has no 

130 # attribute "view" 

131 # error: Item "None" of "Union[str, ndarray, NDFrameT, None]" has no 

132 # attribute "view" 

133 _times = np.asarray( 

134 times.view(np.int64), dtype=np.float64 # type: ignore[union-attr] 

135 ) 

136 _halflife = float(Timedelta(halflife).value) 

137 return np.diff(_times) / _halflife 

138 

139 

140class ExponentialMovingWindow(BaseWindow): 

141 r""" 

142 Provide exponentially weighted (EW) calculations. 

143 

144 Exactly one of ``com``, ``span``, ``halflife``, or ``alpha`` must be 

145 provided if ``times`` is not provided. If ``times`` is provided, 

146 ``halflife`` and one of ``com``, ``span`` or ``alpha`` may be provided. 

147 

148 Parameters 

149 ---------- 

150 com : float, optional 

151 Specify decay in terms of center of mass 

152 

153 :math:`\alpha = 1 / (1 + com)`, for :math:`com \geq 0`. 

154 

155 span : float, optional 

156 Specify decay in terms of span 

157 

158 :math:`\alpha = 2 / (span + 1)`, for :math:`span \geq 1`. 

159 

160 halflife : float, str, timedelta, optional 

161 Specify decay in terms of half-life 

162 

163 :math:`\alpha = 1 - \exp\left(-\ln(2) / halflife\right)`, for 

164 :math:`halflife > 0`. 

165 

166 If ``times`` is specified, a timedelta convertible unit over which an 

167 observation decays to half its value. Only applicable to ``mean()``, 

168 and halflife value will not apply to the other functions. 

169 

170 .. versionadded:: 1.1.0 

171 

172 alpha : float, optional 

173 Specify smoothing factor :math:`\alpha` directly 

174 

175 :math:`0 < \alpha \leq 1`. 

176 

177 min_periods : int, default 0 

178 Minimum number of observations in window required to have a value; 

179 otherwise, result is ``np.nan``. 

180 

181 adjust : bool, default True 

182 Divide by decaying adjustment factor in beginning periods to account 

183 for imbalance in relative weightings (viewing EWMA as a moving average). 

184 

185 - When ``adjust=True`` (default), the EW function is calculated using weights 

186 :math:`w_i = (1 - \alpha)^i`. For example, the EW moving average of the series 

187 [:math:`x_0, x_1, ..., x_t`] would be: 

188 

189 .. math:: 

190 y_t = \frac{x_t + (1 - \alpha)x_{t-1} + (1 - \alpha)^2 x_{t-2} + ... + (1 - 

191 \alpha)^t x_0}{1 + (1 - \alpha) + (1 - \alpha)^2 + ... + (1 - \alpha)^t} 

192 

193 - When ``adjust=False``, the exponentially weighted function is calculated 

194 recursively: 

195 

196 .. math:: 

197 \begin{split} 

198 y_0 &= x_0\\ 

199 y_t &= (1 - \alpha) y_{t-1} + \alpha x_t, 

200 \end{split} 

201 ignore_na : bool, default False 

202 Ignore missing values when calculating weights. 

203 

204 - When ``ignore_na=False`` (default), weights are based on absolute positions. 

205 For example, the weights of :math:`x_0` and :math:`x_2` used in calculating 

206 the final weighted average of [:math:`x_0`, None, :math:`x_2`] are 

207 :math:`(1-\alpha)^2` and :math:`1` if ``adjust=True``, and 

208 :math:`(1-\alpha)^2` and :math:`\alpha` if ``adjust=False``. 

209 

210 - When ``ignore_na=True``, weights are based 

211 on relative positions. For example, the weights of :math:`x_0` and :math:`x_2` 

212 used in calculating the final weighted average of 

213 [:math:`x_0`, None, :math:`x_2`] are :math:`1-\alpha` and :math:`1` if 

214 ``adjust=True``, and :math:`1-\alpha` and :math:`\alpha` if ``adjust=False``. 

215 

216 axis : {0, 1}, default 0 

217 If ``0`` or ``'index'``, calculate across the rows. 

218 

219 If ``1`` or ``'columns'``, calculate across the columns. 

220 

221 For `Series` this parameter is unused and defaults to 0. 

222 

223 times : str, np.ndarray, Series, default None 

224 

225 .. versionadded:: 1.1.0 

226 

227 Only applicable to ``mean()``. 

228 

229 Times corresponding to the observations. Must be monotonically increasing and 

230 ``datetime64[ns]`` dtype. 

231 

232 If 1-D array like, a sequence with the same shape as the observations. 

233 

234 .. deprecated:: 1.4.0 

235 If str, the name of the column in the DataFrame representing the times. 

236 

237 method : str {'single', 'table'}, default 'single' 

238 .. versionadded:: 1.4.0 

239 

240 Execute the rolling operation per single column or row (``'single'``) 

241 or over the entire object (``'table'``). 

242 

243 This argument is only implemented when specifying ``engine='numba'`` 

244 in the method call. 

245 

246 Only applicable to ``mean()`` 

247 

248 Returns 

249 ------- 

250 ``ExponentialMovingWindow`` subclass 

251 

252 See Also 

253 -------- 

254 rolling : Provides rolling window calculations. 

255 expanding : Provides expanding transformations. 

256 

257 Notes 

258 ----- 

259 See :ref:`Windowing Operations <window.exponentially_weighted>` 

260 for further usage details and examples. 

261 

262 Examples 

263 -------- 

264 >>> df = pd.DataFrame({'B': [0, 1, 2, np.nan, 4]}) 

265 >>> df 

266 B 

267 0 0.0 

268 1 1.0 

269 2 2.0 

270 3 NaN 

271 4 4.0 

272 

273 >>> df.ewm(com=0.5).mean() 

274 B 

275 0 0.000000 

276 1 0.750000 

277 2 1.615385 

278 3 1.615385 

279 4 3.670213 

280 >>> df.ewm(alpha=2 / 3).mean() 

281 B 

282 0 0.000000 

283 1 0.750000 

284 2 1.615385 

285 3 1.615385 

286 4 3.670213 

287 

288 **adjust** 

289 

290 >>> df.ewm(com=0.5, adjust=True).mean() 

291 B 

292 0 0.000000 

293 1 0.750000 

294 2 1.615385 

295 3 1.615385 

296 4 3.670213 

297 >>> df.ewm(com=0.5, adjust=False).mean() 

298 B 

299 0 0.000000 

300 1 0.666667 

301 2 1.555556 

302 3 1.555556 

303 4 3.650794 

304 

305 **ignore_na** 

306 

307 >>> df.ewm(com=0.5, ignore_na=True).mean() 

308 B 

309 0 0.000000 

310 1 0.750000 

311 2 1.615385 

312 3 1.615385 

313 4 3.225000 

314 >>> df.ewm(com=0.5, ignore_na=False).mean() 

315 B 

316 0 0.000000 

317 1 0.750000 

318 2 1.615385 

319 3 1.615385 

320 4 3.670213 

321 

322 **times** 

323 

324 Exponentially weighted mean with weights calculated with a timedelta ``halflife`` 

325 relative to ``times``. 

326 

327 >>> times = ['2020-01-01', '2020-01-03', '2020-01-10', '2020-01-15', '2020-01-17'] 

328 >>> df.ewm(halflife='4 days', times=pd.DatetimeIndex(times)).mean() 

329 B 

330 0 0.000000 

331 1 0.585786 

332 2 1.523889 

333 3 1.523889 

334 4 3.233686 

335 """ 

336 

337 _attributes = [ 

338 "com", 

339 "span", 

340 "halflife", 

341 "alpha", 

342 "min_periods", 

343 "adjust", 

344 "ignore_na", 

345 "axis", 

346 "times", 

347 "method", 

348 ] 

349 

350 def __init__( 

351 self, 

352 obj: NDFrame, 

353 com: float | None = None, 

354 span: float | None = None, 

355 halflife: float | TimedeltaConvertibleTypes | None = None, 

356 alpha: float | None = None, 

357 min_periods: int | None = 0, 

358 adjust: bool = True, 

359 ignore_na: bool = False, 

360 axis: Axis = 0, 

361 times: str | np.ndarray | NDFrame | None = None, 

362 method: str = "single", 

363 *, 

364 selection=None, 

365 ) -> None: 

366 super().__init__( 

367 obj=obj, 

368 min_periods=1 if min_periods is None else max(int(min_periods), 1), 

369 on=None, 

370 center=False, 

371 closed=None, 

372 method=method, 

373 axis=axis, 

374 selection=selection, 

375 ) 

376 self.com = com 

377 self.span = span 

378 self.halflife = halflife 

379 self.alpha = alpha 

380 self.adjust = adjust 

381 self.ignore_na = ignore_na 

382 self.times = times 

383 if self.times is not None: 

384 if not self.adjust: 

385 raise NotImplementedError("times is not supported with adjust=False.") 

386 if isinstance(self.times, str): 

387 warnings.warn( 

388 ( 

389 "Specifying times as a string column label is deprecated " 

390 "and will be removed in a future version. Pass the column " 

391 "into times instead." 

392 ), 

393 FutureWarning, 

394 stacklevel=find_stack_level(), 

395 ) 

396 # self.times cannot be str anymore 

397 self.times = cast("Series", self._selected_obj[self.times]) 

398 if not is_datetime64_ns_dtype(self.times): 

399 raise ValueError("times must be datetime64[ns] dtype.") 

400 if len(self.times) != len(obj): 

401 raise ValueError("times must be the same length as the object.") 

402 if not isinstance(self.halflife, (str, datetime.timedelta, np.timedelta64)): 

403 raise ValueError("halflife must be a timedelta convertible object") 

404 if isna(self.times).any(): 

405 raise ValueError("Cannot convert NaT values to integer") 

406 self._deltas = _calculate_deltas(self.times, self.halflife) 

407 # Halflife is no longer applicable when calculating COM 

408 # But allow COM to still be calculated if the user passes other decay args 

409 if common.count_not_none(self.com, self.span, self.alpha) > 0: 

410 self._com = get_center_of_mass(self.com, self.span, None, self.alpha) 

411 else: 

412 self._com = 1.0 

413 else: 

414 if self.halflife is not None and isinstance( 

415 self.halflife, (str, datetime.timedelta, np.timedelta64) 

416 ): 

417 raise ValueError( 

418 "halflife can only be a timedelta convertible argument if " 

419 "times is not None." 

420 ) 

421 # Without times, points are equally spaced 

422 self._deltas = np.ones( 

423 max(self.obj.shape[self.axis] - 1, 0), dtype=np.float64 

424 ) 

425 self._com = get_center_of_mass( 

426 # error: Argument 3 to "get_center_of_mass" has incompatible type 

427 # "Union[float, Any, None, timedelta64, signedinteger[_64Bit]]"; 

428 # expected "Optional[float]" 

429 self.com, 

430 self.span, 

431 self.halflife, # type: ignore[arg-type] 

432 self.alpha, 

433 ) 

434 

435 def _check_window_bounds( 

436 self, start: np.ndarray, end: np.ndarray, num_vals: int 

437 ) -> None: 

438 # emw algorithms are iterative with each point 

439 # ExponentialMovingWindowIndexer "bounds" are the entire window 

440 pass 

441 

442 def _get_window_indexer(self) -> BaseIndexer: 

443 """ 

444 Return an indexer class that will compute the window start and end bounds 

445 """ 

446 return ExponentialMovingWindowIndexer() 

447 

448 def online( 

449 self, engine="numba", engine_kwargs=None 

450 ) -> OnlineExponentialMovingWindow: 

451 """ 

452 Return an ``OnlineExponentialMovingWindow`` object to calculate 

453 exponentially moving window aggregations in an online method. 

454 

455 .. versionadded:: 1.3.0 

456 

457 Parameters 

458 ---------- 

459 engine: str, default ``'numba'`` 

460 Execution engine to calculate online aggregations. 

461 Applies to all supported aggregation methods. 

462 

463 engine_kwargs : dict, default None 

464 Applies to all supported aggregation methods. 

465 

466 * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil`` 

467 and ``parallel`` dictionary keys. The values must either be ``True`` or 

468 ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is 

469 ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be 

470 applied to the function 

471 

472 Returns 

473 ------- 

474 OnlineExponentialMovingWindow 

475 """ 

476 return OnlineExponentialMovingWindow( 

477 obj=self.obj, 

478 com=self.com, 

479 span=self.span, 

480 halflife=self.halflife, 

481 alpha=self.alpha, 

482 min_periods=self.min_periods, 

483 adjust=self.adjust, 

484 ignore_na=self.ignore_na, 

485 axis=self.axis, 

486 times=self.times, 

487 engine=engine, 

488 engine_kwargs=engine_kwargs, 

489 selection=self._selection, 

490 ) 

491 

492 @doc( 

493 _shared_docs["aggregate"], 

494 see_also=dedent( 

495 """ 

496 See Also 

497 -------- 

498 pandas.DataFrame.rolling.aggregate 

499 """ 

500 ), 

501 examples=dedent( 

502 """ 

503 Examples 

504 -------- 

505 >>> df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]}) 

506 >>> df 

507 A B C 

508 0 1 4 7 

509 1 2 5 8 

510 2 3 6 9 

511 

512 >>> df.ewm(alpha=0.5).mean() 

513 A B C 

514 0 1.000000 4.000000 7.000000 

515 1 1.666667 4.666667 7.666667 

516 2 2.428571 5.428571 8.428571 

517 """ 

518 ), 

519 klass="Series/Dataframe", 

520 axis="", 

521 ) 

522 def aggregate(self, func, *args, **kwargs): 

523 return super().aggregate(func, *args, **kwargs) 

524 

525 agg = aggregate 

526 

527 @doc( 

528 template_header, 

529 create_section_header("Parameters"), 

530 kwargs_numeric_only, 

531 args_compat, 

532 window_agg_numba_parameters(), 

533 kwargs_compat, 

534 create_section_header("Returns"), 

535 template_returns, 

536 create_section_header("See Also"), 

537 template_see_also, 

538 create_section_header("Notes"), 

539 numba_notes.replace("\n", "", 1), 

540 window_method="ewm", 

541 aggregation_description="(exponential weighted moment) mean", 

542 agg_method="mean", 

543 ) 

544 def mean( 

545 self, 

546 numeric_only: bool = False, 

547 *args, 

548 engine=None, 

549 engine_kwargs=None, 

550 **kwargs, 

551 ): 

552 maybe_warn_args_and_kwargs(type(self), "mean", args, kwargs) 

553 if maybe_use_numba(engine): 

554 if self.method == "single": 

555 func = generate_numba_ewm_func 

556 else: 

557 func = generate_numba_ewm_table_func 

558 ewm_func = func( 

559 **get_jit_arguments(engine_kwargs), 

560 com=self._com, 

561 adjust=self.adjust, 

562 ignore_na=self.ignore_na, 

563 deltas=tuple(self._deltas), 

564 normalize=True, 

565 ) 

566 return self._apply(ewm_func, name="mean") 

567 elif engine in ("cython", None): 

568 if engine_kwargs is not None: 

569 raise ValueError("cython engine does not accept engine_kwargs") 

570 nv.validate_window_func("mean", args, kwargs) 

571 

572 deltas = None if self.times is None else self._deltas 

573 window_func = partial( 

574 window_aggregations.ewm, 

575 com=self._com, 

576 adjust=self.adjust, 

577 ignore_na=self.ignore_na, 

578 deltas=deltas, 

579 normalize=True, 

580 ) 

581 return self._apply(window_func, name="mean", numeric_only=numeric_only) 

582 else: 

583 raise ValueError("engine must be either 'numba' or 'cython'") 

584 

585 @doc( 

586 template_header, 

587 create_section_header("Parameters"), 

588 kwargs_numeric_only, 

589 args_compat, 

590 window_agg_numba_parameters(), 

591 kwargs_compat, 

592 create_section_header("Returns"), 

593 template_returns, 

594 create_section_header("See Also"), 

595 template_see_also, 

596 create_section_header("Notes"), 

597 numba_notes.replace("\n", "", 1), 

598 window_method="ewm", 

599 aggregation_description="(exponential weighted moment) sum", 

600 agg_method="sum", 

601 ) 

602 def sum( 

603 self, 

604 numeric_only: bool = False, 

605 *args, 

606 engine=None, 

607 engine_kwargs=None, 

608 **kwargs, 

609 ): 

610 maybe_warn_args_and_kwargs(type(self), "sum", args, kwargs) 

611 if not self.adjust: 

612 raise NotImplementedError("sum is not implemented with adjust=False") 

613 if maybe_use_numba(engine): 

614 if self.method == "single": 

615 func = generate_numba_ewm_func 

616 else: 

617 func = generate_numba_ewm_table_func 

618 ewm_func = func( 

619 **get_jit_arguments(engine_kwargs), 

620 com=self._com, 

621 adjust=self.adjust, 

622 ignore_na=self.ignore_na, 

623 deltas=tuple(self._deltas), 

624 normalize=False, 

625 ) 

626 return self._apply(ewm_func, name="sum") 

627 elif engine in ("cython", None): 

628 if engine_kwargs is not None: 

629 raise ValueError("cython engine does not accept engine_kwargs") 

630 nv.validate_window_func("sum", args, kwargs) 

631 

632 deltas = None if self.times is None else self._deltas 

633 window_func = partial( 

634 window_aggregations.ewm, 

635 com=self._com, 

636 adjust=self.adjust, 

637 ignore_na=self.ignore_na, 

638 deltas=deltas, 

639 normalize=False, 

640 ) 

641 return self._apply(window_func, name="sum", numeric_only=numeric_only) 

642 else: 

643 raise ValueError("engine must be either 'numba' or 'cython'") 

644 

645 @doc( 

646 template_header, 

647 create_section_header("Parameters"), 

648 dedent( 

649 """ 

650 bias : bool, default False 

651 Use a standard estimation bias correction. 

652 """ 

653 ).replace("\n", "", 1), 

654 kwargs_numeric_only, 

655 args_compat, 

656 kwargs_compat, 

657 create_section_header("Returns"), 

658 template_returns, 

659 create_section_header("See Also"), 

660 template_see_also[:-1], 

661 window_method="ewm", 

662 aggregation_description="(exponential weighted moment) standard deviation", 

663 agg_method="std", 

664 ) 

665 def std(self, bias: bool = False, numeric_only: bool = False, *args, **kwargs): 

666 maybe_warn_args_and_kwargs(type(self), "std", args, kwargs) 

667 nv.validate_window_func("std", args, kwargs) 

668 if ( 

669 numeric_only 

670 and self._selected_obj.ndim == 1 

671 and not is_numeric_dtype(self._selected_obj.dtype) 

672 ): 

673 # Raise directly so error message says std instead of var 

674 raise NotImplementedError( 

675 f"{type(self).__name__}.std does not implement numeric_only" 

676 ) 

677 return zsqrt(self.var(bias=bias, numeric_only=numeric_only, **kwargs)) 

678 

679 def vol(self, bias: bool = False, *args, **kwargs): 

680 warnings.warn( 

681 ( 

682 "vol is deprecated will be removed in a future version. " 

683 "Use std instead." 

684 ), 

685 FutureWarning, 

686 stacklevel=find_stack_level(), 

687 ) 

688 return self.std(bias, *args, **kwargs) 

689 

690 @doc( 

691 template_header, 

692 create_section_header("Parameters"), 

693 dedent( 

694 """ 

695 bias : bool, default False 

696 Use a standard estimation bias correction. 

697 """ 

698 ).replace("\n", "", 1), 

699 kwargs_numeric_only, 

700 args_compat, 

701 kwargs_compat, 

702 create_section_header("Returns"), 

703 template_returns, 

704 create_section_header("See Also"), 

705 template_see_also[:-1], 

706 window_method="ewm", 

707 aggregation_description="(exponential weighted moment) variance", 

708 agg_method="var", 

709 ) 

710 def var(self, bias: bool = False, numeric_only: bool = False, *args, **kwargs): 

711 maybe_warn_args_and_kwargs(type(self), "var", args, kwargs) 

712 nv.validate_window_func("var", args, kwargs) 

713 window_func = window_aggregations.ewmcov 

714 wfunc = partial( 

715 window_func, 

716 com=self._com, 

717 adjust=self.adjust, 

718 ignore_na=self.ignore_na, 

719 bias=bias, 

720 ) 

721 

722 def var_func(values, begin, end, min_periods): 

723 return wfunc(values, begin, end, min_periods, values) 

724 

725 return self._apply(var_func, name="var", numeric_only=numeric_only) 

726 

727 @doc( 

728 template_header, 

729 create_section_header("Parameters"), 

730 dedent( 

731 """ 

732 other : Series or DataFrame , optional 

733 If not supplied then will default to self and produce pairwise 

734 output. 

735 pairwise : bool, default None 

736 If False then only matching columns between self and other will be 

737 used and the output will be a DataFrame. 

738 If True then all pairwise combinations will be calculated and the 

739 output will be a MultiIndex DataFrame in the case of DataFrame 

740 inputs. In the case of missing elements, only complete pairwise 

741 observations will be used. 

742 bias : bool, default False 

743 Use a standard estimation bias correction. 

744 """ 

745 ).replace("\n", "", 1), 

746 kwargs_numeric_only, 

747 kwargs_compat, 

748 create_section_header("Returns"), 

749 template_returns, 

750 create_section_header("See Also"), 

751 template_see_also[:-1], 

752 window_method="ewm", 

753 aggregation_description="(exponential weighted moment) sample covariance", 

754 agg_method="cov", 

755 ) 

756 def cov( 

757 self, 

758 other: DataFrame | Series | None = None, 

759 pairwise: bool | None = None, 

760 bias: bool = False, 

761 numeric_only: bool = False, 

762 **kwargs, 

763 ): 

764 from pandas import Series 

765 

766 maybe_warn_args_and_kwargs(type(self), "cov", None, kwargs) 

767 self._validate_numeric_only("cov", numeric_only) 

768 

769 def cov_func(x, y): 

770 x_array = self._prep_values(x) 

771 y_array = self._prep_values(y) 

772 window_indexer = self._get_window_indexer() 

773 min_periods = ( 

774 self.min_periods 

775 if self.min_periods is not None 

776 else window_indexer.window_size 

777 ) 

778 start, end = window_indexer.get_window_bounds( 

779 num_values=len(x_array), 

780 min_periods=min_periods, 

781 center=self.center, 

782 closed=self.closed, 

783 step=self.step, 

784 ) 

785 result = window_aggregations.ewmcov( 

786 x_array, 

787 start, 

788 end, 

789 # error: Argument 4 to "ewmcov" has incompatible type 

790 # "Optional[int]"; expected "int" 

791 self.min_periods, # type: ignore[arg-type] 

792 y_array, 

793 self._com, 

794 self.adjust, 

795 self.ignore_na, 

796 bias, 

797 ) 

798 return Series(result, index=x.index, name=x.name) 

799 

800 return self._apply_pairwise( 

801 self._selected_obj, other, pairwise, cov_func, numeric_only 

802 ) 

803 

804 @doc( 

805 template_header, 

806 create_section_header("Parameters"), 

807 dedent( 

808 """ 

809 other : Series or DataFrame, optional 

810 If not supplied then will default to self and produce pairwise 

811 output. 

812 pairwise : bool, default None 

813 If False then only matching columns between self and other will be 

814 used and the output will be a DataFrame. 

815 If True then all pairwise combinations will be calculated and the 

816 output will be a MultiIndex DataFrame in the case of DataFrame 

817 inputs. In the case of missing elements, only complete pairwise 

818 observations will be used. 

819 """ 

820 ).replace("\n", "", 1), 

821 kwargs_numeric_only, 

822 kwargs_compat, 

823 create_section_header("Returns"), 

824 template_returns, 

825 create_section_header("See Also"), 

826 template_see_also[:-1], 

827 window_method="ewm", 

828 aggregation_description="(exponential weighted moment) sample correlation", 

829 agg_method="corr", 

830 ) 

831 def corr( 

832 self, 

833 other: DataFrame | Series | None = None, 

834 pairwise: bool | None = None, 

835 numeric_only: bool = False, 

836 **kwargs, 

837 ): 

838 from pandas import Series 

839 

840 maybe_warn_args_and_kwargs(type(self), "corr", None, kwargs) 

841 self._validate_numeric_only("corr", numeric_only) 

842 

843 def cov_func(x, y): 

844 x_array = self._prep_values(x) 

845 y_array = self._prep_values(y) 

846 window_indexer = self._get_window_indexer() 

847 min_periods = ( 

848 self.min_periods 

849 if self.min_periods is not None 

850 else window_indexer.window_size 

851 ) 

852 start, end = window_indexer.get_window_bounds( 

853 num_values=len(x_array), 

854 min_periods=min_periods, 

855 center=self.center, 

856 closed=self.closed, 

857 step=self.step, 

858 ) 

859 

860 def _cov(X, Y): 

861 return window_aggregations.ewmcov( 

862 X, 

863 start, 

864 end, 

865 min_periods, 

866 Y, 

867 self._com, 

868 self.adjust, 

869 self.ignore_na, 

870 True, 

871 ) 

872 

873 with np.errstate(all="ignore"): 

874 cov = _cov(x_array, y_array) 

875 x_var = _cov(x_array, x_array) 

876 y_var = _cov(y_array, y_array) 

877 result = cov / zsqrt(x_var * y_var) 

878 return Series(result, index=x.index, name=x.name) 

879 

880 return self._apply_pairwise( 

881 self._selected_obj, other, pairwise, cov_func, numeric_only 

882 ) 

883 

884 

885class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow): 

886 """ 

887 Provide an exponential moving window groupby implementation. 

888 """ 

889 

890 _attributes = ExponentialMovingWindow._attributes + BaseWindowGroupby._attributes 

891 

892 def __init__(self, obj, *args, _grouper=None, **kwargs) -> None: 

893 super().__init__(obj, *args, _grouper=_grouper, **kwargs) 

894 

895 if not obj.empty and self.times is not None: 

896 # sort the times and recalculate the deltas according to the groups 

897 groupby_order = np.concatenate(list(self._grouper.indices.values())) 

898 self._deltas = _calculate_deltas( 

899 self.times.take(groupby_order), # type: ignore[union-attr] 

900 self.halflife, 

901 ) 

902 

903 def _get_window_indexer(self) -> GroupbyIndexer: 

904 """ 

905 Return an indexer class that will compute the window start and end bounds 

906 

907 Returns 

908 ------- 

909 GroupbyIndexer 

910 """ 

911 window_indexer = GroupbyIndexer( 

912 groupby_indices=self._grouper.indices, 

913 window_indexer=ExponentialMovingWindowIndexer, 

914 ) 

915 return window_indexer 

916 

917 

918class OnlineExponentialMovingWindow(ExponentialMovingWindow): 

919 def __init__( 

920 self, 

921 obj: NDFrame, 

922 com: float | None = None, 

923 span: float | None = None, 

924 halflife: float | TimedeltaConvertibleTypes | None = None, 

925 alpha: float | None = None, 

926 min_periods: int | None = 0, 

927 adjust: bool = True, 

928 ignore_na: bool = False, 

929 axis: Axis = 0, 

930 times: str | np.ndarray | NDFrame | None = None, 

931 engine: str = "numba", 

932 engine_kwargs: dict[str, bool] | None = None, 

933 *, 

934 selection=None, 

935 ) -> None: 

936 if times is not None: 

937 raise NotImplementedError( 

938 "times is not implemented with online operations." 

939 ) 

940 super().__init__( 

941 obj=obj, 

942 com=com, 

943 span=span, 

944 halflife=halflife, 

945 alpha=alpha, 

946 min_periods=min_periods, 

947 adjust=adjust, 

948 ignore_na=ignore_na, 

949 axis=axis, 

950 times=times, 

951 selection=selection, 

952 ) 

953 self._mean = EWMMeanState( 

954 self._com, self.adjust, self.ignore_na, self.axis, obj.shape 

955 ) 

956 if maybe_use_numba(engine): 

957 self.engine = engine 

958 self.engine_kwargs = engine_kwargs 

959 else: 

960 raise ValueError("'numba' is the only supported engine") 

961 

962 def reset(self) -> None: 

963 """ 

964 Reset the state captured by `update` calls. 

965 """ 

966 self._mean.reset() 

967 

968 def aggregate(self, func, *args, **kwargs): 

969 return NotImplementedError 

970 

971 def std(self, bias: bool = False, *args, **kwargs): 

972 return NotImplementedError 

973 

974 def corr( 

975 self, 

976 other: DataFrame | Series | None = None, 

977 pairwise: bool | None = None, 

978 numeric_only: bool = False, 

979 **kwargs, 

980 ): 

981 return NotImplementedError 

982 

983 def cov( 

984 self, 

985 other: DataFrame | Series | None = None, 

986 pairwise: bool | None = None, 

987 bias: bool = False, 

988 numeric_only: bool = False, 

989 **kwargs, 

990 ): 

991 return NotImplementedError 

992 

993 def var(self, bias: bool = False, *args, **kwargs): 

994 return NotImplementedError 

995 

996 def mean(self, *args, update=None, update_times=None, **kwargs): 

997 """ 

998 Calculate an online exponentially weighted mean. 

999 

1000 Parameters 

1001 ---------- 

1002 update: DataFrame or Series, default None 

1003 New values to continue calculating the 

1004 exponentially weighted mean from the last values and weights. 

1005 Values should be float64 dtype. 

1006 

1007 ``update`` needs to be ``None`` the first time the 

1008 exponentially weighted mean is calculated. 

1009 

1010 update_times: Series or 1-D np.ndarray, default None 

1011 New times to continue calculating the 

1012 exponentially weighted mean from the last values and weights. 

1013 If ``None``, values are assumed to be evenly spaced 

1014 in time. 

1015 This feature is currently unsupported. 

1016 

1017 Returns 

1018 ------- 

1019 DataFrame or Series 

1020 

1021 Examples 

1022 -------- 

1023 >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)}) 

1024 >>> online_ewm = df.head(2).ewm(0.5).online() 

1025 >>> online_ewm.mean() 

1026 a b 

1027 0 0.00 5.00 

1028 1 0.75 5.75 

1029 >>> online_ewm.mean(update=df.tail(3)) 

1030 a b 

1031 2 1.615385 6.615385 

1032 3 2.550000 7.550000 

1033 4 3.520661 8.520661 

1034 >>> online_ewm.reset() 

1035 >>> online_ewm.mean() 

1036 a b 

1037 0 0.00 5.00 

1038 1 0.75 5.75 

1039 """ 

1040 result_kwargs = {} 

1041 is_frame = True if self._selected_obj.ndim == 2 else False 

1042 if update_times is not None: 

1043 raise NotImplementedError("update_times is not implemented.") 

1044 else: 

1045 update_deltas = np.ones( 

1046 max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 

1047 ) 

1048 if update is not None: 

1049 if self._mean.last_ewm is None: 

1050 raise ValueError( 

1051 "Must call mean with update=None first before passing update" 

1052 ) 

1053 result_from = 1 

1054 result_kwargs["index"] = update.index 

1055 if is_frame: 

1056 last_value = self._mean.last_ewm[np.newaxis, :] 

1057 result_kwargs["columns"] = update.columns 

1058 else: 

1059 last_value = self._mean.last_ewm 

1060 result_kwargs["name"] = update.name 

1061 np_array = np.concatenate((last_value, update.to_numpy())) 

1062 else: 

1063 result_from = 0 

1064 result_kwargs["index"] = self._selected_obj.index 

1065 if is_frame: 

1066 result_kwargs["columns"] = self._selected_obj.columns 

1067 else: 

1068 result_kwargs["name"] = self._selected_obj.name 

1069 np_array = self._selected_obj.astype(np.float64).to_numpy() 

1070 ewma_func = generate_online_numba_ewma_func( 

1071 **get_jit_arguments(self.engine_kwargs) 

1072 ) 

1073 result = self._mean.run_ewm( 

1074 np_array if is_frame else np_array[:, np.newaxis], 

1075 update_deltas, 

1076 self.min_periods, 

1077 ewma_func, 

1078 ) 

1079 if not is_frame: 

1080 result = result.squeeze() 

1081 result = result[result_from:] 

1082 result = self._selected_obj._constructor(result, **result_kwargs) 

1083 return result