Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/window/online.py: 15%

52 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from __future__ import annotations 

2 

3from typing import TYPE_CHECKING 

4 

5import numpy as np 

6 

7from pandas.compat._optional import import_optional_dependency 

8 

9 

10def generate_online_numba_ewma_func( 

11 nopython: bool, 

12 nogil: bool, 

13 parallel: bool, 

14): 

15 """ 

16 Generate a numba jitted groupby ewma function specified by values 

17 from engine_kwargs. 

18 

19 Parameters 

20 ---------- 

21 nopython : bool 

22 nopython to be passed into numba.jit 

23 nogil : bool 

24 nogil to be passed into numba.jit 

25 parallel : bool 

26 parallel to be passed into numba.jit 

27 

28 Returns 

29 ------- 

30 Numba function 

31 """ 

32 if TYPE_CHECKING: 

33 import numba 

34 else: 

35 numba = import_optional_dependency("numba") 

36 

37 @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) 

38 def online_ewma( 

39 values: np.ndarray, 

40 deltas: np.ndarray, 

41 minimum_periods: int, 

42 old_wt_factor: float, 

43 new_wt: float, 

44 old_wt: np.ndarray, 

45 adjust: bool, 

46 ignore_na: bool, 

47 ): 

48 """ 

49 Compute online exponentially weighted mean per column over 2D values. 

50 

51 Takes the first observation as is, then computes the subsequent 

52 exponentially weighted mean accounting minimum periods. 

53 """ 

54 result = np.empty(values.shape) 

55 weighted_avg = values[0] 

56 nobs = (~np.isnan(weighted_avg)).astype(np.int64) 

57 result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) 

58 

59 for i in range(1, len(values)): 

60 cur = values[i] 

61 is_observations = ~np.isnan(cur) 

62 nobs += is_observations.astype(np.int64) 

63 for j in numba.prange(len(cur)): 

64 if not np.isnan(weighted_avg[j]): 

65 if is_observations[j] or not ignore_na: 

66 

67 # note that len(deltas) = len(vals) - 1 and deltas[i] is to be 

68 # used in conjunction with vals[i+1] 

69 old_wt[j] *= old_wt_factor ** deltas[j - 1] 

70 if is_observations[j]: 

71 # avoid numerical errors on constant series 

72 if weighted_avg[j] != cur[j]: 

73 weighted_avg[j] = ( 

74 (old_wt[j] * weighted_avg[j]) + (new_wt * cur[j]) 

75 ) / (old_wt[j] + new_wt) 

76 if adjust: 

77 old_wt[j] += new_wt 

78 else: 

79 old_wt[j] = 1.0 

80 elif is_observations[j]: 

81 weighted_avg[j] = cur[j] 

82 

83 result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) 

84 

85 return result, old_wt 

86 

87 return online_ewma 

88 

89 

90class EWMMeanState: 

91 def __init__(self, com, adjust, ignore_na, axis, shape) -> None: 

92 alpha = 1.0 / (1.0 + com) 

93 self.axis = axis 

94 self.shape = shape 

95 self.adjust = adjust 

96 self.ignore_na = ignore_na 

97 self.new_wt = 1.0 if adjust else alpha 

98 self.old_wt_factor = 1.0 - alpha 

99 self.old_wt = np.ones(self.shape[self.axis - 1]) 

100 self.last_ewm = None 

101 

102 def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func): 

103 result, old_wt = ewm_func( 

104 weighted_avg, 

105 deltas, 

106 min_periods, 

107 self.old_wt_factor, 

108 self.new_wt, 

109 self.old_wt, 

110 self.adjust, 

111 self.ignore_na, 

112 ) 

113 self.old_wt = old_wt 

114 self.last_ewm = result[-1] 

115 return result 

116 

117 def reset(self) -> None: 

118 self.old_wt = np.ones(self.shape[self.axis - 1]) 

119 self.last_ewm = None