Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/window/online.py: 15%
52 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3from typing import TYPE_CHECKING
5import numpy as np
7from pandas.compat._optional import import_optional_dependency
10def generate_online_numba_ewma_func(
11 nopython: bool,
12 nogil: bool,
13 parallel: bool,
14):
15 """
16 Generate a numba jitted groupby ewma function specified by values
17 from engine_kwargs.
19 Parameters
20 ----------
21 nopython : bool
22 nopython to be passed into numba.jit
23 nogil : bool
24 nogil to be passed into numba.jit
25 parallel : bool
26 parallel to be passed into numba.jit
28 Returns
29 -------
30 Numba function
31 """
32 if TYPE_CHECKING:
33 import numba
34 else:
35 numba = import_optional_dependency("numba")
37 @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
38 def online_ewma(
39 values: np.ndarray,
40 deltas: np.ndarray,
41 minimum_periods: int,
42 old_wt_factor: float,
43 new_wt: float,
44 old_wt: np.ndarray,
45 adjust: bool,
46 ignore_na: bool,
47 ):
48 """
49 Compute online exponentially weighted mean per column over 2D values.
51 Takes the first observation as is, then computes the subsequent
52 exponentially weighted mean accounting minimum periods.
53 """
54 result = np.empty(values.shape)
55 weighted_avg = values[0]
56 nobs = (~np.isnan(weighted_avg)).astype(np.int64)
57 result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)
59 for i in range(1, len(values)):
60 cur = values[i]
61 is_observations = ~np.isnan(cur)
62 nobs += is_observations.astype(np.int64)
63 for j in numba.prange(len(cur)):
64 if not np.isnan(weighted_avg[j]):
65 if is_observations[j] or not ignore_na:
67 # note that len(deltas) = len(vals) - 1 and deltas[i] is to be
68 # used in conjunction with vals[i+1]
69 old_wt[j] *= old_wt_factor ** deltas[j - 1]
70 if is_observations[j]:
71 # avoid numerical errors on constant series
72 if weighted_avg[j] != cur[j]:
73 weighted_avg[j] = (
74 (old_wt[j] * weighted_avg[j]) + (new_wt * cur[j])
75 ) / (old_wt[j] + new_wt)
76 if adjust:
77 old_wt[j] += new_wt
78 else:
79 old_wt[j] = 1.0
80 elif is_observations[j]:
81 weighted_avg[j] = cur[j]
83 result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)
85 return result, old_wt
87 return online_ewma
90class EWMMeanState:
91 def __init__(self, com, adjust, ignore_na, axis, shape) -> None:
92 alpha = 1.0 / (1.0 + com)
93 self.axis = axis
94 self.shape = shape
95 self.adjust = adjust
96 self.ignore_na = ignore_na
97 self.new_wt = 1.0 if adjust else alpha
98 self.old_wt_factor = 1.0 - alpha
99 self.old_wt = np.ones(self.shape[self.axis - 1])
100 self.last_ewm = None
102 def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func):
103 result, old_wt = ewm_func(
104 weighted_avg,
105 deltas,
106 min_periods,
107 self.old_wt_factor,
108 self.new_wt,
109 self.old_wt,
110 self.adjust,
111 self.ignore_na,
112 )
113 self.old_wt = old_wt
114 self.last_ewm = result[-1]
115 return result
117 def reset(self) -> None:
118 self.old_wt = np.ones(self.shape[self.axis - 1])
119 self.last_ewm = None