Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/window/common.py: 8%

94 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Common utility functions for rolling operations""" 

2from __future__ import annotations 

3 

4from collections import defaultdict 

5from typing import cast 

6import warnings 

7 

8import numpy as np 

9 

10from pandas.util._exceptions import find_stack_level 

11 

12from pandas.core.dtypes.generic import ( 

13 ABCDataFrame, 

14 ABCSeries, 

15) 

16 

17from pandas.core.indexes.api import MultiIndex 

18 

19 

20def flex_binary_moment(arg1, arg2, f, pairwise=False): 

21 

22 if isinstance(arg1, ABCSeries) and isinstance(arg2, ABCSeries): 

23 X, Y = prep_binary(arg1, arg2) 

24 return f(X, Y) 

25 

26 elif isinstance(arg1, ABCDataFrame): 

27 from pandas import DataFrame 

28 

29 def dataframe_from_int_dict(data, frame_template): 

30 result = DataFrame(data, index=frame_template.index) 

31 if len(result.columns) > 0: 

32 result.columns = frame_template.columns[result.columns] 

33 return result 

34 

35 results = {} 

36 if isinstance(arg2, ABCDataFrame): 

37 if pairwise is False: 

38 if arg1 is arg2: 

39 # special case in order to handle duplicate column names 

40 for i in range(len(arg1.columns)): 

41 results[i] = f(arg1.iloc[:, i], arg2.iloc[:, i]) 

42 return dataframe_from_int_dict(results, arg1) 

43 else: 

44 if not arg1.columns.is_unique: 

45 raise ValueError("'arg1' columns are not unique") 

46 if not arg2.columns.is_unique: 

47 raise ValueError("'arg2' columns are not unique") 

48 X, Y = arg1.align(arg2, join="outer") 

49 X, Y = prep_binary(X, Y) 

50 res_columns = arg1.columns.union(arg2.columns) 

51 for col in res_columns: 

52 if col in X and col in Y: 

53 results[col] = f(X[col], Y[col]) 

54 return DataFrame(results, index=X.index, columns=res_columns) 

55 elif pairwise is True: 

56 results = defaultdict(dict) 

57 for i in range(len(arg1.columns)): 

58 for j in range(len(arg2.columns)): 

59 if j < i and arg2 is arg1: 

60 # Symmetric case 

61 results[i][j] = results[j][i] 

62 else: 

63 results[i][j] = f( 

64 *prep_binary(arg1.iloc[:, i], arg2.iloc[:, j]) 

65 ) 

66 

67 from pandas import concat 

68 

69 result_index = arg1.index.union(arg2.index) 

70 if len(result_index): 

71 

72 # construct result frame 

73 result = concat( 

74 [ 

75 concat( 

76 [results[i][j] for j in range(len(arg2.columns))], 

77 ignore_index=True, 

78 ) 

79 for i in range(len(arg1.columns)) 

80 ], 

81 ignore_index=True, 

82 axis=1, 

83 ) 

84 result.columns = arg1.columns 

85 

86 # set the index and reorder 

87 if arg2.columns.nlevels > 1: 

88 # mypy needs to know columns is a MultiIndex, Index doesn't 

89 # have levels attribute 

90 arg2.columns = cast(MultiIndex, arg2.columns) 

91 # GH 21157: Equivalent to MultiIndex.from_product( 

92 # [result_index], <unique combinations of arg2.columns.levels>, 

93 # ) 

94 # A normal MultiIndex.from_product will produce too many 

95 # combinations. 

96 result_level = np.tile( 

97 result_index, len(result) // len(result_index) 

98 ) 

99 arg2_levels = ( 

100 np.repeat( 

101 arg2.columns.get_level_values(i), 

102 len(result) // len(arg2.columns), 

103 ) 

104 for i in range(arg2.columns.nlevels) 

105 ) 

106 result_names = list(arg2.columns.names) + [result_index.name] 

107 result.index = MultiIndex.from_arrays( 

108 [*arg2_levels, result_level], names=result_names 

109 ) 

110 # GH 34440 

111 num_levels = len(result.index.levels) 

112 new_order = [num_levels - 1] + list(range(num_levels - 1)) 

113 result = result.reorder_levels(new_order).sort_index() 

114 else: 

115 result.index = MultiIndex.from_product( 

116 [range(len(arg2.columns)), range(len(result_index))] 

117 ) 

118 result = result.swaplevel(1, 0).sort_index() 

119 result.index = MultiIndex.from_product( 

120 [result_index] + [arg2.columns] 

121 ) 

122 else: 

123 

124 # empty result 

125 result = DataFrame( 

126 index=MultiIndex( 

127 levels=[arg1.index, arg2.columns], codes=[[], []] 

128 ), 

129 columns=arg2.columns, 

130 dtype="float64", 

131 ) 

132 

133 # reset our index names to arg1 names 

134 # reset our column names to arg2 names 

135 # careful not to mutate the original names 

136 result.columns = result.columns.set_names(arg1.columns.names) 

137 result.index = result.index.set_names( 

138 result_index.names + arg2.columns.names 

139 ) 

140 

141 return result 

142 else: 

143 results = { 

144 i: f(*prep_binary(arg1.iloc[:, i], arg2)) 

145 for i in range(len(arg1.columns)) 

146 } 

147 return dataframe_from_int_dict(results, arg1) 

148 

149 else: 

150 return flex_binary_moment(arg2, arg1, f) 

151 

152 

153def zsqrt(x): 

154 with np.errstate(all="ignore"): 

155 result = np.sqrt(x) 

156 mask = x < 0 

157 

158 if isinstance(x, ABCDataFrame): 

159 if mask._values.any(): 

160 result[mask] = 0 

161 else: 

162 if mask.any(): 

163 result[mask] = 0 

164 

165 return result 

166 

167 

168def prep_binary(arg1, arg2): 

169 # mask out values, this also makes a common index... 

170 X = arg1 + 0 * arg2 

171 Y = arg2 + 0 * arg1 

172 return X, Y 

173 

174 

175def maybe_warn_args_and_kwargs(cls, kernel: str, args, kwargs) -> None: 

176 """ 

177 Warn for deprecation of args and kwargs in rolling/expanding functions. 

178 

179 Parameters 

180 ---------- 

181 cls : type 

182 Class to warn about. 

183 kernel : str 

184 Operation name. 

185 args : tuple or None 

186 args passed by user. Will be None if and only if kernel does not have args. 

187 kwargs : dict or None 

188 kwargs passed by user. Will be None if and only if kernel does not have kwargs. 

189 """ 

190 warn_args = args is not None and len(args) > 0 

191 warn_kwargs = kwargs is not None and len(kwargs) > 0 

192 if warn_args and warn_kwargs: 

193 msg = "args and kwargs" 

194 elif warn_args: 

195 msg = "args" 

196 elif warn_kwargs: 

197 msg = "kwargs" 

198 else: 

199 msg = "" 

200 if msg != "": 

201 warnings.warn( 

202 f"Passing additional {msg} to {cls.__name__}.{kernel} has " 

203 "no impact on the result and is deprecated. This will " 

204 "raise a TypeError in a future version of pandas.", 

205 category=FutureWarning, 

206 stacklevel=find_stack_level(), 

207 )