Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/window/common.py: 8%
94 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""Common utility functions for rolling operations"""
2from __future__ import annotations
4from collections import defaultdict
5from typing import cast
6import warnings
8import numpy as np
10from pandas.util._exceptions import find_stack_level
12from pandas.core.dtypes.generic import (
13 ABCDataFrame,
14 ABCSeries,
15)
17from pandas.core.indexes.api import MultiIndex
20def flex_binary_moment(arg1, arg2, f, pairwise=False):
22 if isinstance(arg1, ABCSeries) and isinstance(arg2, ABCSeries):
23 X, Y = prep_binary(arg1, arg2)
24 return f(X, Y)
26 elif isinstance(arg1, ABCDataFrame):
27 from pandas import DataFrame
29 def dataframe_from_int_dict(data, frame_template):
30 result = DataFrame(data, index=frame_template.index)
31 if len(result.columns) > 0:
32 result.columns = frame_template.columns[result.columns]
33 return result
35 results = {}
36 if isinstance(arg2, ABCDataFrame):
37 if pairwise is False:
38 if arg1 is arg2:
39 # special case in order to handle duplicate column names
40 for i in range(len(arg1.columns)):
41 results[i] = f(arg1.iloc[:, i], arg2.iloc[:, i])
42 return dataframe_from_int_dict(results, arg1)
43 else:
44 if not arg1.columns.is_unique:
45 raise ValueError("'arg1' columns are not unique")
46 if not arg2.columns.is_unique:
47 raise ValueError("'arg2' columns are not unique")
48 X, Y = arg1.align(arg2, join="outer")
49 X, Y = prep_binary(X, Y)
50 res_columns = arg1.columns.union(arg2.columns)
51 for col in res_columns:
52 if col in X and col in Y:
53 results[col] = f(X[col], Y[col])
54 return DataFrame(results, index=X.index, columns=res_columns)
55 elif pairwise is True:
56 results = defaultdict(dict)
57 for i in range(len(arg1.columns)):
58 for j in range(len(arg2.columns)):
59 if j < i and arg2 is arg1:
60 # Symmetric case
61 results[i][j] = results[j][i]
62 else:
63 results[i][j] = f(
64 *prep_binary(arg1.iloc[:, i], arg2.iloc[:, j])
65 )
67 from pandas import concat
69 result_index = arg1.index.union(arg2.index)
70 if len(result_index):
72 # construct result frame
73 result = concat(
74 [
75 concat(
76 [results[i][j] for j in range(len(arg2.columns))],
77 ignore_index=True,
78 )
79 for i in range(len(arg1.columns))
80 ],
81 ignore_index=True,
82 axis=1,
83 )
84 result.columns = arg1.columns
86 # set the index and reorder
87 if arg2.columns.nlevels > 1:
88 # mypy needs to know columns is a MultiIndex, Index doesn't
89 # have levels attribute
90 arg2.columns = cast(MultiIndex, arg2.columns)
91 # GH 21157: Equivalent to MultiIndex.from_product(
92 # [result_index], <unique combinations of arg2.columns.levels>,
93 # )
94 # A normal MultiIndex.from_product will produce too many
95 # combinations.
96 result_level = np.tile(
97 result_index, len(result) // len(result_index)
98 )
99 arg2_levels = (
100 np.repeat(
101 arg2.columns.get_level_values(i),
102 len(result) // len(arg2.columns),
103 )
104 for i in range(arg2.columns.nlevels)
105 )
106 result_names = list(arg2.columns.names) + [result_index.name]
107 result.index = MultiIndex.from_arrays(
108 [*arg2_levels, result_level], names=result_names
109 )
110 # GH 34440
111 num_levels = len(result.index.levels)
112 new_order = [num_levels - 1] + list(range(num_levels - 1))
113 result = result.reorder_levels(new_order).sort_index()
114 else:
115 result.index = MultiIndex.from_product(
116 [range(len(arg2.columns)), range(len(result_index))]
117 )
118 result = result.swaplevel(1, 0).sort_index()
119 result.index = MultiIndex.from_product(
120 [result_index] + [arg2.columns]
121 )
122 else:
124 # empty result
125 result = DataFrame(
126 index=MultiIndex(
127 levels=[arg1.index, arg2.columns], codes=[[], []]
128 ),
129 columns=arg2.columns,
130 dtype="float64",
131 )
133 # reset our index names to arg1 names
134 # reset our column names to arg2 names
135 # careful not to mutate the original names
136 result.columns = result.columns.set_names(arg1.columns.names)
137 result.index = result.index.set_names(
138 result_index.names + arg2.columns.names
139 )
141 return result
142 else:
143 results = {
144 i: f(*prep_binary(arg1.iloc[:, i], arg2))
145 for i in range(len(arg1.columns))
146 }
147 return dataframe_from_int_dict(results, arg1)
149 else:
150 return flex_binary_moment(arg2, arg1, f)
153def zsqrt(x):
154 with np.errstate(all="ignore"):
155 result = np.sqrt(x)
156 mask = x < 0
158 if isinstance(x, ABCDataFrame):
159 if mask._values.any():
160 result[mask] = 0
161 else:
162 if mask.any():
163 result[mask] = 0
165 return result
168def prep_binary(arg1, arg2):
169 # mask out values, this also makes a common index...
170 X = arg1 + 0 * arg2
171 Y = arg2 + 0 * arg1
172 return X, Y
175def maybe_warn_args_and_kwargs(cls, kernel: str, args, kwargs) -> None:
176 """
177 Warn for deprecation of args and kwargs in rolling/expanding functions.
179 Parameters
180 ----------
181 cls : type
182 Class to warn about.
183 kernel : str
184 Operation name.
185 args : tuple or None
186 args passed by user. Will be None if and only if kernel does not have args.
187 kwargs : dict or None
188 kwargs passed by user. Will be None if and only if kernel does not have kwargs.
189 """
190 warn_args = args is not None and len(args) > 0
191 warn_kwargs = kwargs is not None and len(kwargs) > 0
192 if warn_args and warn_kwargs:
193 msg = "args and kwargs"
194 elif warn_args:
195 msg = "args"
196 elif warn_kwargs:
197 msg = "kwargs"
198 else:
199 msg = ""
200 if msg != "":
201 warnings.warn(
202 f"Passing additional {msg} to {cls.__name__}.{kernel} has "
203 "no impact on the result and is deprecated. This will "
204 "raise a TypeError in a future version of pandas.",
205 category=FutureWarning,
206 stacklevel=find_stack_level(),
207 )