Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/apply.py: 15%
658 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3import abc
4from collections import defaultdict
5from functools import partial
6import inspect
7import re
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 Callable,
12 DefaultDict,
13 Dict,
14 Hashable,
15 Iterable,
16 Iterator,
17 List,
18 Sequence,
19 cast,
20)
21import warnings
23import numpy as np
25from pandas._config import option_context
27from pandas._libs import lib
28from pandas._typing import (
29 AggFuncType,
30 AggFuncTypeBase,
31 AggFuncTypeDict,
32 AggObjType,
33 Axis,
34 NDFrameT,
35 npt,
36)
37from pandas.errors import (
38 DataError,
39 SpecificationError,
40)
41from pandas.util._decorators import cache_readonly
42from pandas.util._exceptions import find_stack_level
44from pandas.core.dtypes.cast import is_nested_object
45from pandas.core.dtypes.common import (
46 is_dict_like,
47 is_extension_array_dtype,
48 is_list_like,
49 is_sequence,
50)
51from pandas.core.dtypes.generic import (
52 ABCDataFrame,
53 ABCNDFrame,
54 ABCSeries,
55)
57from pandas.core.algorithms import safe_sort
58from pandas.core.base import SelectionMixin
59import pandas.core.common as com
60from pandas.core.construction import (
61 create_series_with_explicit_dtype,
62 ensure_wrapped_if_datetimelike,
63)
65if TYPE_CHECKING: 65 ↛ 66line 65 didn't jump to line 66, because the condition on line 65 was never true
66 from pandas import (
67 DataFrame,
68 Index,
69 Series,
70 )
71 from pandas.core.groupby import GroupBy
72 from pandas.core.resample import Resampler
73 from pandas.core.window.rolling import BaseWindow
76ResType = Dict[int, Any]
79def frame_apply(
80 obj: DataFrame,
81 func: AggFuncType,
82 axis: Axis = 0,
83 raw: bool = False,
84 result_type: str | None = None,
85 args=None,
86 kwargs=None,
87) -> FrameApply:
88 """construct and return a row or column based frame apply object"""
89 axis = obj._get_axis_number(axis)
90 klass: type[FrameApply]
91 if axis == 0:
92 klass = FrameRowApply
93 elif axis == 1:
94 klass = FrameColumnApply
96 return klass(
97 obj,
98 func,
99 raw=raw,
100 result_type=result_type,
101 args=args,
102 kwargs=kwargs,
103 )
106class Apply(metaclass=abc.ABCMeta):
107 axis: int
109 def __init__(
110 self,
111 obj: AggObjType,
112 func,
113 raw: bool,
114 result_type: str | None,
115 args,
116 kwargs,
117 ) -> None:
118 self.obj = obj
119 self.raw = raw
120 self.args = args or ()
121 self.kwargs = kwargs or {}
123 if result_type not in [None, "reduce", "broadcast", "expand"]:
124 raise ValueError(
125 "invalid value for result_type, must be one "
126 "of {None, 'reduce', 'broadcast', 'expand'}"
127 )
129 self.result_type = result_type
131 # curry if needed
132 if (
133 (kwargs or args)
134 and not isinstance(func, (np.ufunc, str))
135 and not is_list_like(func)
136 ):
138 def f(x):
139 return func(x, *args, **kwargs)
141 else:
142 f = func
144 self.orig_f: AggFuncType = func
145 self.f: AggFuncType = f
147 @abc.abstractmethod
148 def apply(self) -> DataFrame | Series:
149 pass
151 def agg(self) -> DataFrame | Series | None:
152 """
153 Provide an implementation for the aggregators.
155 Returns
156 -------
157 Result of aggregation, or None if agg cannot be performed by
158 this method.
159 """
160 obj = self.obj
161 arg = self.f
162 args = self.args
163 kwargs = self.kwargs
165 if isinstance(arg, str):
166 return self.apply_str()
168 if is_dict_like(arg):
169 return self.agg_dict_like()
170 elif is_list_like(arg):
171 # we require a list, but not a 'str'
172 return self.agg_list_like()
174 if callable(arg):
175 f = com.get_cython_func(arg)
176 if f and not args and not kwargs:
177 return getattr(obj, f)()
179 # caller can react
180 return None
182 def transform(self) -> DataFrame | Series:
183 """
184 Transform a DataFrame or Series.
186 Returns
187 -------
188 DataFrame or Series
189 Result of applying ``func`` along the given axis of the
190 Series or DataFrame.
192 Raises
193 ------
194 ValueError
195 If the transform function fails or does not transform.
196 """
197 obj = self.obj
198 func = self.orig_f
199 axis = self.axis
200 args = self.args
201 kwargs = self.kwargs
203 is_series = obj.ndim == 1
205 if obj._get_axis_number(axis) == 1:
206 assert not is_series
207 return obj.T.transform(func, 0, *args, **kwargs).T
209 if is_list_like(func) and not is_dict_like(func):
210 func = cast(List[AggFuncTypeBase], func)
211 # Convert func equivalent dict
212 if is_series:
213 func = {com.get_callable_name(v) or v: v for v in func}
214 else:
215 func = {col: func for col in obj}
217 if is_dict_like(func):
218 func = cast(AggFuncTypeDict, func)
219 return self.transform_dict_like(func)
221 # func is either str or callable
222 func = cast(AggFuncTypeBase, func)
223 try:
224 result = self.transform_str_or_callable(func)
225 except TypeError:
226 raise
227 except Exception as err:
228 raise ValueError("Transform function failed") from err
230 # Functions that transform may return empty Series/DataFrame
231 # when the dtype is not appropriate
232 if (
233 isinstance(result, (ABCSeries, ABCDataFrame))
234 and result.empty
235 and not obj.empty
236 ):
237 raise ValueError("Transform function failed")
238 # error: Argument 1 to "__get__" of "AxisProperty" has incompatible type
239 # "Union[Series, DataFrame, GroupBy[Any], SeriesGroupBy,
240 # DataFrameGroupBy, BaseWindow, Resampler]"; expected "Union[DataFrame,
241 # Series]"
242 if not isinstance(result, (ABCSeries, ABCDataFrame)) or not result.index.equals(
243 obj.index # type:ignore[arg-type]
244 ):
245 raise ValueError("Function did not transform")
247 return result
249 def transform_dict_like(self, func):
250 """
251 Compute transform in the case of a dict-like func
252 """
253 from pandas.core.reshape.concat import concat
255 obj = self.obj
256 args = self.args
257 kwargs = self.kwargs
259 # transform is currently only for Series/DataFrame
260 assert isinstance(obj, ABCNDFrame)
262 if len(func) == 0:
263 raise ValueError("No transform functions were provided")
265 func = self.normalize_dictlike_arg("transform", obj, func)
267 results: dict[Hashable, DataFrame | Series] = {}
268 failed_names = []
269 all_type_errors = True
270 for name, how in func.items():
271 colg = obj._gotitem(name, ndim=1)
272 try:
273 results[name] = colg.transform(how, 0, *args, **kwargs)
274 except Exception as err:
275 if str(err) in {
276 "Function did not transform",
277 "No transform functions were provided",
278 }:
279 raise err
280 else:
281 if not isinstance(err, TypeError):
282 all_type_errors = False
283 failed_names.append(name)
284 # combine results
285 if not results:
286 klass = TypeError if all_type_errors else ValueError
287 raise klass("Transform function failed")
288 if len(failed_names) > 0:
289 warnings.warn(
290 f"{failed_names} did not transform successfully. If any error is "
291 f"raised, this will raise in a future version of pandas. "
292 f"Drop these columns/ops to avoid this warning.",
293 FutureWarning,
294 stacklevel=find_stack_level(),
295 )
296 return concat(results, axis=1)
298 def transform_str_or_callable(self, func) -> DataFrame | Series:
299 """
300 Compute transform in the case of a string or callable func
301 """
302 obj = self.obj
303 args = self.args
304 kwargs = self.kwargs
306 if isinstance(func, str):
307 return self._try_aggregate_string_function(obj, func, *args, **kwargs)
309 if not args and not kwargs:
310 f = com.get_cython_func(func)
311 if f:
312 return getattr(obj, f)()
314 # Two possible ways to use a UDF - apply or call directly
315 try:
316 return obj.apply(func, args=args, **kwargs)
317 except Exception:
318 return func(obj, *args, **kwargs)
320 def agg_list_like(self) -> DataFrame | Series:
321 """
322 Compute aggregation in the case of a list-like argument.
324 Returns
325 -------
326 Result of aggregation.
327 """
328 from pandas.core.reshape.concat import concat
330 obj = self.obj
331 arg = cast(List[AggFuncTypeBase], self.f)
333 if getattr(obj, "axis", 0) == 1:
334 raise NotImplementedError("axis other than 0 is not supported")
336 if not isinstance(obj, SelectionMixin):
337 # i.e. obj is Series or DataFrame
338 selected_obj = obj
339 elif obj._selected_obj.ndim == 1:
340 # For SeriesGroupBy this matches _obj_with_exclusions
341 selected_obj = obj._selected_obj
342 else:
343 selected_obj = obj._obj_with_exclusions
345 results = []
346 keys = []
347 failed_names = []
349 depr_nuisance_columns_msg = (
350 "{} did not aggregate successfully. If any error is "
351 "raised this will raise in a future version of pandas. "
352 "Drop these columns/ops to avoid this warning."
353 )
355 # degenerate case
356 if selected_obj.ndim == 1:
357 for a in arg:
358 colg = obj._gotitem(selected_obj.name, ndim=1, subset=selected_obj)
359 try:
360 new_res = colg.aggregate(a)
362 except TypeError:
363 failed_names.append(com.get_callable_name(a) or a)
364 else:
365 results.append(new_res)
367 # make sure we find a good name
368 name = com.get_callable_name(a) or a
369 keys.append(name)
371 # multiples
372 else:
373 indices = []
374 for index, col in enumerate(selected_obj):
375 colg = obj._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index])
376 try:
377 # Capture and suppress any warnings emitted by us in the call
378 # to agg below, but pass through any warnings that were
379 # generated otherwise.
380 # This is necessary because of https://bugs.python.org/issue29672
381 # See GH #43741 for more details
382 with warnings.catch_warnings(record=True) as record:
383 new_res = colg.aggregate(arg)
384 if len(record) > 0:
385 match = re.compile(depr_nuisance_columns_msg.format(".*"))
386 for warning in record:
387 if re.match(match, str(warning.message)):
388 failed_names.append(col)
389 else:
390 warnings.warn_explicit(
391 message=warning.message,
392 category=warning.category,
393 filename=warning.filename,
394 lineno=warning.lineno,
395 )
397 except (TypeError, DataError):
398 failed_names.append(col)
399 except ValueError as err:
400 # cannot aggregate
401 if "Must produce aggregated value" in str(err):
402 # raised directly in _aggregate_named
403 failed_names.append(col)
404 elif "no results" in str(err):
405 # reached in test_frame_apply.test_nuiscance_columns
406 # where the colg.aggregate(arg) ends up going through
407 # the selected_obj.ndim == 1 branch above with arg == ["sum"]
408 # on a datetime64[ns] column
409 failed_names.append(col)
410 else:
411 raise
412 else:
413 results.append(new_res)
414 indices.append(index)
416 keys = selected_obj.columns.take(indices)
418 # if we are empty
419 if not len(results):
420 raise ValueError("no results")
422 if len(failed_names) > 0:
423 warnings.warn(
424 depr_nuisance_columns_msg.format(failed_names),
425 FutureWarning,
426 stacklevel=find_stack_level(),
427 )
429 try:
430 concatenated = concat(results, keys=keys, axis=1, sort=False)
431 except TypeError as err:
432 # we are concatting non-NDFrame objects,
433 # e.g. a list of scalars
434 from pandas import Series
436 result = Series(results, index=keys, name=obj.name)
437 if is_nested_object(result):
438 raise ValueError(
439 "cannot combine transform and aggregation operations"
440 ) from err
441 return result
442 else:
443 # Concat uses the first index to determine the final indexing order.
444 # The union of a shorter first index with the other indices causes
445 # the index sorting to be different from the order of the aggregating
446 # functions. Reindex if this is the case.
447 index_size = concatenated.index.size
448 full_ordered_index = next(
449 result.index for result in results if result.index.size == index_size
450 )
451 return concatenated.reindex(full_ordered_index, copy=False)
453 def agg_dict_like(self) -> DataFrame | Series:
454 """
455 Compute aggregation in the case of a dict-like argument.
457 Returns
458 -------
459 Result of aggregation.
460 """
461 from pandas import Index
462 from pandas.core.reshape.concat import concat
464 obj = self.obj
465 arg = cast(AggFuncTypeDict, self.f)
467 if getattr(obj, "axis", 0) == 1:
468 raise NotImplementedError("axis other than 0 is not supported")
470 if not isinstance(obj, SelectionMixin):
471 # i.e. obj is Series or DataFrame
472 selected_obj = obj
473 selection = None
474 else:
475 selected_obj = obj._selected_obj
476 selection = obj._selection
478 arg = self.normalize_dictlike_arg("agg", selected_obj, arg)
480 if selected_obj.ndim == 1:
481 # key only used for output
482 colg = obj._gotitem(selection, ndim=1)
483 results = {key: colg.agg(how) for key, how in arg.items()}
484 else:
485 # key used for column selection and output
486 results = {
487 key: obj._gotitem(key, ndim=1).agg(how) for key, how in arg.items()
488 }
490 # set the final keys
491 keys = list(arg.keys())
493 # Avoid making two isinstance calls in all and any below
494 is_ndframe = [isinstance(r, ABCNDFrame) for r in results.values()]
496 # combine results
497 if all(is_ndframe):
498 keys_to_use: Iterable[Hashable]
499 keys_to_use = [k for k in keys if not results[k].empty]
500 # Have to check, if at least one DataFrame is not empty.
501 keys_to_use = keys_to_use if keys_to_use != [] else keys
502 if selected_obj.ndim == 2:
503 # keys are columns, so we can preserve names
504 ktu = Index(keys_to_use)
505 ktu._set_names(selected_obj.columns.names)
506 keys_to_use = ktu
508 axis = 0 if isinstance(obj, ABCSeries) else 1
509 result = concat(
510 {k: results[k] for k in keys_to_use}, axis=axis, keys=keys_to_use
511 )
512 elif any(is_ndframe):
513 # There is a mix of NDFrames and scalars
514 raise ValueError(
515 "cannot perform both aggregation "
516 "and transformation operations "
517 "simultaneously"
518 )
519 else:
520 from pandas import Series
522 # we have a dict of scalars
523 # GH 36212 use name only if obj is a series
524 if obj.ndim == 1:
525 obj = cast("Series", obj)
526 name = obj.name
527 else:
528 name = None
530 result = Series(results, name=name)
532 return result
534 def apply_str(self) -> DataFrame | Series:
535 """
536 Compute apply in case of a string.
538 Returns
539 -------
540 result: Series or DataFrame
541 """
542 # Caller is responsible for checking isinstance(self.f, str)
543 f = cast(str, self.f)
545 obj = self.obj
547 # Support for `frame.transform('method')`
548 # Some methods (shift, etc.) require the axis argument, others
549 # don't, so inspect and insert if necessary.
550 func = getattr(obj, f, None)
551 if callable(func):
552 sig = inspect.getfullargspec(func)
553 arg_names = (*sig.args, *sig.kwonlyargs)
554 if self.axis != 0 and (
555 "axis" not in arg_names or f in ("corrwith", "mad", "skew")
556 ):
557 raise ValueError(f"Operation {f} does not support axis=1")
558 elif "axis" in arg_names:
559 self.kwargs["axis"] = self.axis
560 elif self.axis != 0:
561 raise ValueError(f"Operation {f} does not support axis=1")
562 return self._try_aggregate_string_function(obj, f, *self.args, **self.kwargs)
564 def apply_multiple(self) -> DataFrame | Series:
565 """
566 Compute apply in case of a list-like or dict-like.
568 Returns
569 -------
570 result: Series, DataFrame, or None
571 Result when self.f is a list-like or dict-like, None otherwise.
572 """
573 return self.obj.aggregate(self.f, self.axis, *self.args, **self.kwargs)
575 def normalize_dictlike_arg(
576 self, how: str, obj: DataFrame | Series, func: AggFuncTypeDict
577 ) -> AggFuncTypeDict:
578 """
579 Handler for dict-like argument.
581 Ensures that necessary columns exist if obj is a DataFrame, and
582 that a nested renamer is not passed. Also normalizes to all lists
583 when values consists of a mix of list and non-lists.
584 """
585 assert how in ("apply", "agg", "transform")
587 # Can't use func.values(); wouldn't work for a Series
588 if (
589 how == "agg"
590 and isinstance(obj, ABCSeries)
591 and any(is_list_like(v) for _, v in func.items())
592 ) or (any(is_dict_like(v) for _, v in func.items())):
593 # GH 15931 - deprecation of renaming keys
594 raise SpecificationError("nested renamer is not supported")
596 if obj.ndim != 1:
597 # Check for missing columns on a frame
598 cols = set(func.keys()) - set(obj.columns)
599 if len(cols) > 0:
600 cols_sorted = list(safe_sort(list(cols)))
601 raise KeyError(f"Column(s) {cols_sorted} do not exist")
603 aggregator_types = (list, tuple, dict)
605 # if we have a dict of any non-scalars
606 # eg. {'A' : ['mean']}, normalize all to
607 # be list-likes
608 # Cannot use func.values() because arg may be a Series
609 if any(isinstance(x, aggregator_types) for _, x in func.items()):
610 new_func: AggFuncTypeDict = {}
611 for k, v in func.items():
612 if not isinstance(v, aggregator_types):
613 new_func[k] = [v]
614 else:
615 new_func[k] = v
616 func = new_func
617 return func
619 def _try_aggregate_string_function(self, obj, arg: str, *args, **kwargs):
620 """
621 if arg is a string, then try to operate on it:
622 - try to find a function (or attribute) on ourselves
623 - try to find a numpy function
624 - raise
625 """
626 assert isinstance(arg, str)
628 f = getattr(obj, arg, None)
629 if f is not None:
630 if callable(f):
631 return f(*args, **kwargs)
633 # people may try to aggregate on a non-callable attribute
634 # but don't let them think they can pass args to it
635 assert len(args) == 0
636 assert len([kwarg for kwarg in kwargs if kwarg not in ["axis"]]) == 0
637 return f
639 f = getattr(np, arg, None)
640 if f is not None and hasattr(obj, "__array__"):
641 # in particular exclude Window
642 return f(obj, *args, **kwargs)
644 raise AttributeError(
645 f"'{arg}' is not a valid function for '{type(obj).__name__}' object"
646 )
649class NDFrameApply(Apply):
650 """
651 Methods shared by FrameApply and SeriesApply but
652 not GroupByApply or ResamplerWindowApply
653 """
655 @property
656 def index(self) -> Index:
657 # error: Argument 1 to "__get__" of "AxisProperty" has incompatible type
658 # "Union[Series, DataFrame, GroupBy[Any], SeriesGroupBy,
659 # DataFrameGroupBy, BaseWindow, Resampler]"; expected "Union[DataFrame,
660 # Series]"
661 return self.obj.index # type:ignore[arg-type]
663 @property
664 def agg_axis(self) -> Index:
665 return self.obj._get_agg_axis(self.axis)
668class FrameApply(NDFrameApply):
669 obj: DataFrame
671 # ---------------------------------------------------------------
672 # Abstract Methods
674 @property
675 @abc.abstractmethod
676 def result_index(self) -> Index:
677 pass
679 @property
680 @abc.abstractmethod
681 def result_columns(self) -> Index:
682 pass
684 @property
685 @abc.abstractmethod
686 def series_generator(self) -> Iterator[Series]:
687 pass
689 @abc.abstractmethod
690 def wrap_results_for_axis(
691 self, results: ResType, res_index: Index
692 ) -> DataFrame | Series:
693 pass
695 # ---------------------------------------------------------------
697 @property
698 def res_columns(self) -> Index:
699 return self.result_columns
701 @property
702 def columns(self) -> Index:
703 return self.obj.columns
705 @cache_readonly
706 def values(self):
707 return self.obj.values
709 @cache_readonly
710 def dtypes(self) -> Series:
711 return self.obj.dtypes
713 def apply(self) -> DataFrame | Series:
714 """compute the results"""
715 # dispatch to agg
716 if is_list_like(self.f):
717 return self.apply_multiple()
719 # all empty
720 if len(self.columns) == 0 and len(self.index) == 0:
721 return self.apply_empty_result()
723 # string dispatch
724 if isinstance(self.f, str):
725 return self.apply_str()
727 # ufunc
728 elif isinstance(self.f, np.ufunc):
729 with np.errstate(all="ignore"):
730 results = self.obj._mgr.apply("apply", func=self.f)
731 # _constructor will retain self.index and self.columns
732 return self.obj._constructor(data=results)
734 # broadcasting
735 if self.result_type == "broadcast":
736 return self.apply_broadcast(self.obj)
738 # one axis empty
739 elif not all(self.obj.shape):
740 return self.apply_empty_result()
742 # raw
743 elif self.raw:
744 return self.apply_raw()
746 return self.apply_standard()
748 def agg(self):
749 obj = self.obj
750 axis = self.axis
752 # TODO: Avoid having to change state
753 self.obj = self.obj if self.axis == 0 else self.obj.T
754 self.axis = 0
756 result = None
757 try:
758 result = super().agg()
759 except TypeError as err:
760 exc = TypeError(
761 "DataFrame constructor called with "
762 f"incompatible data and dtype: {err}"
763 )
764 raise exc from err
765 finally:
766 self.obj = obj
767 self.axis = axis
769 if axis == 1:
770 result = result.T if result is not None else result
772 if result is None:
773 result = self.obj.apply(self.orig_f, axis, args=self.args, **self.kwargs)
775 return result
777 def apply_empty_result(self):
778 """
779 we have an empty result; at least 1 axis is 0
781 we will try to apply the function to an empty
782 series in order to see if this is a reduction function
783 """
784 assert callable(self.f)
786 # we are not asked to reduce or infer reduction
787 # so just return a copy of the existing object
788 if self.result_type not in ["reduce", None]:
789 return self.obj.copy()
791 # we may need to infer
792 should_reduce = self.result_type == "reduce"
794 from pandas import Series
796 if not should_reduce:
797 try:
798 if self.axis == 0:
799 r = self.f(Series([], dtype=np.float64))
800 else:
801 r = self.f(Series(index=self.columns, dtype=np.float64))
802 except Exception:
803 pass
804 else:
805 should_reduce = not isinstance(r, Series)
807 if should_reduce:
808 if len(self.agg_axis):
809 r = self.f(Series([], dtype=np.float64))
810 else:
811 r = np.nan
813 return self.obj._constructor_sliced(r, index=self.agg_axis)
814 else:
815 return self.obj.copy()
817 def apply_raw(self):
818 """apply to the values as a numpy array"""
820 def wrap_function(func):
821 """
822 Wrap user supplied function to work around numpy issue.
824 see https://github.com/numpy/numpy/issues/8352
825 """
827 def wrapper(*args, **kwargs):
828 result = func(*args, **kwargs)
829 if isinstance(result, str):
830 result = np.array(result, dtype=object)
831 return result
833 return wrapper
835 result = np.apply_along_axis(wrap_function(self.f), self.axis, self.values)
837 # TODO: mixed type case
838 if result.ndim == 2:
839 return self.obj._constructor(result, index=self.index, columns=self.columns)
840 else:
841 return self.obj._constructor_sliced(result, index=self.agg_axis)
843 def apply_broadcast(self, target: DataFrame) -> DataFrame:
844 assert callable(self.f)
846 result_values = np.empty_like(target.values)
848 # axis which we want to compare compliance
849 result_compare = target.shape[0]
851 for i, col in enumerate(target.columns):
852 res = self.f(target[col])
853 ares = np.asarray(res).ndim
855 # must be a scalar or 1d
856 if ares > 1:
857 raise ValueError("too many dims to broadcast")
858 elif ares == 1:
860 # must match return dim
861 if result_compare != len(res):
862 raise ValueError("cannot broadcast result")
864 result_values[:, i] = res
866 # we *always* preserve the original index / columns
867 result = self.obj._constructor(
868 result_values, index=target.index, columns=target.columns
869 )
870 return result
872 def apply_standard(self):
873 results, res_index = self.apply_series_generator()
875 # wrap results
876 return self.wrap_results(results, res_index)
878 def apply_series_generator(self) -> tuple[ResType, Index]:
879 assert callable(self.f)
881 series_gen = self.series_generator
882 res_index = self.result_index
884 results = {}
886 with option_context("mode.chained_assignment", None):
887 for i, v in enumerate(series_gen):
888 # ignore SettingWithCopy here in case the user mutates
889 results[i] = self.f(v)
890 if isinstance(results[i], ABCSeries):
891 # If we have a view on v, we need to make a copy because
892 # series_generator will swap out the underlying data
893 results[i] = results[i].copy(deep=False)
895 return results, res_index
897 def wrap_results(self, results: ResType, res_index: Index) -> DataFrame | Series:
898 from pandas import Series
900 # see if we can infer the results
901 if len(results) > 0 and 0 in results and is_sequence(results[0]):
902 return self.wrap_results_for_axis(results, res_index)
904 # dict of scalars
906 # the default dtype of an empty Series will be `object`, but this
907 # code can be hit by df.mean() where the result should have dtype
908 # float64 even if it's an empty Series.
909 constructor_sliced = self.obj._constructor_sliced
910 if constructor_sliced is Series:
911 result = create_series_with_explicit_dtype(
912 results, dtype_if_empty=np.float64
913 )
914 else:
915 result = constructor_sliced(results)
916 result.index = res_index
918 return result
920 def apply_str(self) -> DataFrame | Series:
921 # Caller is responsible for checking isinstance(self.f, str)
922 # TODO: GH#39993 - Avoid special-casing by replacing with lambda
923 if self.f == "size":
924 # Special-cased because DataFrame.size returns a single scalar
925 obj = self.obj
926 value = obj.shape[self.axis]
927 return obj._constructor_sliced(value, index=self.agg_axis)
928 return super().apply_str()
931class FrameRowApply(FrameApply):
932 axis = 0
934 def apply_broadcast(self, target: DataFrame) -> DataFrame:
935 return super().apply_broadcast(target)
937 @property
938 def series_generator(self):
939 return (self.obj._ixs(i, axis=1) for i in range(len(self.columns)))
941 @property
942 def result_index(self) -> Index:
943 return self.columns
945 @property
946 def result_columns(self) -> Index:
947 return self.index
949 def wrap_results_for_axis(
950 self, results: ResType, res_index: Index
951 ) -> DataFrame | Series:
952 """return the results for the rows"""
954 if self.result_type == "reduce":
955 # e.g. test_apply_dict GH#8735
956 res = self.obj._constructor_sliced(results)
957 res.index = res_index
958 return res
960 elif self.result_type is None and all(
961 isinstance(x, dict) for x in results.values()
962 ):
963 # Our operation was a to_dict op e.g.
964 # test_apply_dict GH#8735, test_apply_reduce_to_dict GH#25196 #37544
965 res = self.obj._constructor_sliced(results)
966 res.index = res_index
967 return res
969 try:
970 result = self.obj._constructor(data=results)
971 except ValueError as err:
972 if "All arrays must be of the same length" in str(err):
973 # e.g. result = [[2, 3], [1.5], ['foo', 'bar']]
974 # see test_agg_listlike_result GH#29587
975 res = self.obj._constructor_sliced(results)
976 res.index = res_index
977 return res
978 else:
979 raise
981 if not isinstance(results[0], ABCSeries):
982 if len(result.index) == len(self.res_columns):
983 result.index = self.res_columns
985 if len(result.columns) == len(res_index):
986 result.columns = res_index
988 return result
991class FrameColumnApply(FrameApply):
992 axis = 1
994 def apply_broadcast(self, target: DataFrame) -> DataFrame:
995 result = super().apply_broadcast(target.T)
996 return result.T
998 @property
999 def series_generator(self):
1000 values = self.values
1001 values = ensure_wrapped_if_datetimelike(values)
1002 assert len(values) > 0
1004 # We create one Series object, and will swap out the data inside
1005 # of it. Kids: don't do this at home.
1006 ser = self.obj._ixs(0, axis=0)
1007 mgr = ser._mgr
1009 if is_extension_array_dtype(ser.dtype):
1010 # values will be incorrect for this block
1011 # TODO(EA2D): special case would be unnecessary with 2D EAs
1012 obj = self.obj
1013 for i in range(len(obj)):
1014 yield obj._ixs(i, axis=0)
1016 else:
1017 for (arr, name) in zip(values, self.index):
1018 # GH#35462 re-pin mgr in case setitem changed it
1019 ser._mgr = mgr
1020 mgr.set_values(arr)
1021 object.__setattr__(ser, "_name", name)
1022 yield ser
1024 @property
1025 def result_index(self) -> Index:
1026 return self.index
1028 @property
1029 def result_columns(self) -> Index:
1030 return self.columns
1032 def wrap_results_for_axis(
1033 self, results: ResType, res_index: Index
1034 ) -> DataFrame | Series:
1035 """return the results for the columns"""
1036 result: DataFrame | Series
1038 # we have requested to expand
1039 if self.result_type == "expand":
1040 result = self.infer_to_same_shape(results, res_index)
1042 # we have a non-series and don't want inference
1043 elif not isinstance(results[0], ABCSeries):
1044 result = self.obj._constructor_sliced(results)
1045 result.index = res_index
1047 # we may want to infer results
1048 else:
1049 result = self.infer_to_same_shape(results, res_index)
1051 return result
1053 def infer_to_same_shape(self, results: ResType, res_index: Index) -> DataFrame:
1054 """infer the results to the same shape as the input object"""
1055 result = self.obj._constructor(data=results)
1056 result = result.T
1058 # set the index
1059 result.index = res_index
1061 # infer dtypes
1062 result = result.infer_objects()
1064 return result
1067class SeriesApply(NDFrameApply):
1068 obj: Series
1069 axis = 0
1071 def __init__(
1072 self,
1073 obj: Series,
1074 func: AggFuncType,
1075 convert_dtype: bool,
1076 args,
1077 kwargs,
1078 ) -> None:
1079 self.convert_dtype = convert_dtype
1081 super().__init__(
1082 obj,
1083 func,
1084 raw=False,
1085 result_type=None,
1086 args=args,
1087 kwargs=kwargs,
1088 )
1090 def apply(self) -> DataFrame | Series:
1091 obj = self.obj
1093 if len(obj) == 0:
1094 return self.apply_empty_result()
1096 # dispatch to agg
1097 if is_list_like(self.f):
1098 return self.apply_multiple()
1100 if isinstance(self.f, str):
1101 # if we are a string, try to dispatch
1102 return self.apply_str()
1104 # self.f is Callable
1105 return self.apply_standard()
1107 def agg(self):
1108 result = super().agg()
1109 if result is None:
1110 f = self.f
1111 kwargs = self.kwargs
1113 # string, list-like, and dict-like are entirely handled in super
1114 assert callable(f)
1116 # we can be called from an inner function which
1117 # passes this meta-data
1118 kwargs.pop("_level", None)
1120 # try a regular apply, this evaluates lambdas
1121 # row-by-row; however if the lambda is expected a Series
1122 # expression, e.g.: lambda x: x-x.quantile(0.25)
1123 # this will fail, so we can try a vectorized evaluation
1125 # we cannot FIRST try the vectorized evaluation, because
1126 # then .agg and .apply would have different semantics if the
1127 # operation is actually defined on the Series, e.g. str
1128 try:
1129 result = self.obj.apply(f)
1130 except (ValueError, AttributeError, TypeError):
1131 result = f(self.obj)
1133 return result
1135 def apply_empty_result(self) -> Series:
1136 obj = self.obj
1137 return obj._constructor(dtype=obj.dtype, index=obj.index).__finalize__(
1138 obj, method="apply"
1139 )
1141 def apply_standard(self) -> DataFrame | Series:
1142 # caller is responsible for ensuring that f is Callable
1143 f = cast(Callable, self.f)
1144 obj = self.obj
1146 with np.errstate(all="ignore"):
1147 if isinstance(f, np.ufunc):
1148 return f(obj)
1150 # row-wise access
1151 if is_extension_array_dtype(obj.dtype) and hasattr(obj._values, "map"):
1152 # GH#23179 some EAs do not have `map`
1153 mapped = obj._values.map(f)
1154 else:
1155 values = obj.astype(object)._values
1156 mapped = lib.map_infer(
1157 values,
1158 f,
1159 convert=self.convert_dtype,
1160 )
1162 if len(mapped) and isinstance(mapped[0], ABCSeries):
1163 # GH#43986 Need to do list(mapped) in order to get treated as nested
1164 # See also GH#25959 regarding EA support
1165 return obj._constructor_expanddim(list(mapped), index=obj.index)
1166 else:
1167 return obj._constructor(mapped, index=obj.index).__finalize__(
1168 obj, method="apply"
1169 )
1172class GroupByApply(Apply):
1173 def __init__(
1174 self,
1175 obj: GroupBy[NDFrameT],
1176 func: AggFuncType,
1177 args,
1178 kwargs,
1179 ) -> None:
1180 kwargs = kwargs.copy()
1181 self.axis = obj.obj._get_axis_number(kwargs.get("axis", 0))
1182 super().__init__(
1183 obj,
1184 func,
1185 raw=False,
1186 result_type=None,
1187 args=args,
1188 kwargs=kwargs,
1189 )
1191 def apply(self):
1192 raise NotImplementedError
1194 def transform(self):
1195 raise NotImplementedError
1198class ResamplerWindowApply(Apply):
1199 axis = 0
1200 obj: Resampler | BaseWindow
1202 def __init__(
1203 self,
1204 obj: Resampler | BaseWindow,
1205 func: AggFuncType,
1206 args,
1207 kwargs,
1208 ) -> None:
1209 super().__init__(
1210 obj,
1211 func,
1212 raw=False,
1213 result_type=None,
1214 args=args,
1215 kwargs=kwargs,
1216 )
1218 def apply(self):
1219 raise NotImplementedError
1221 def transform(self):
1222 raise NotImplementedError
1225def reconstruct_func(
1226 func: AggFuncType | None, **kwargs
1227) -> tuple[bool, AggFuncType | None, list[str] | None, npt.NDArray[np.intp] | None]:
1228 """
1229 This is the internal function to reconstruct func given if there is relabeling
1230 or not and also normalize the keyword to get new order of columns.
1232 If named aggregation is applied, `func` will be None, and kwargs contains the
1233 column and aggregation function information to be parsed;
1234 If named aggregation is not applied, `func` is either string (e.g. 'min') or
1235 Callable, or list of them (e.g. ['min', np.max]), or the dictionary of column name
1236 and str/Callable/list of them (e.g. {'A': 'min'}, or {'A': [np.min, lambda x: x]})
1238 If relabeling is True, will return relabeling, reconstructed func, column
1239 names, and the reconstructed order of columns.
1240 If relabeling is False, the columns and order will be None.
1242 Parameters
1243 ----------
1244 func: agg function (e.g. 'min' or Callable) or list of agg functions
1245 (e.g. ['min', np.max]) or dictionary (e.g. {'A': ['min', np.max]}).
1246 **kwargs: dict, kwargs used in is_multi_agg_with_relabel and
1247 normalize_keyword_aggregation function for relabelling
1249 Returns
1250 -------
1251 relabelling: bool, if there is relabelling or not
1252 func: normalized and mangled func
1253 columns: list of column names
1254 order: array of columns indices
1256 Examples
1257 --------
1258 >>> reconstruct_func(None, **{"foo": ("col", "min")})
1259 (True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0]))
1261 >>> reconstruct_func("min")
1262 (False, 'min', None, None)
1263 """
1264 relabeling = func is None and is_multi_agg_with_relabel(**kwargs)
1265 columns: list[str] | None = None
1266 order: npt.NDArray[np.intp] | None = None
1268 if not relabeling:
1269 if isinstance(func, list) and len(func) > len(set(func)):
1271 # GH 28426 will raise error if duplicated function names are used and
1272 # there is no reassigned name
1273 raise SpecificationError(
1274 "Function names must be unique if there is no new column names "
1275 "assigned"
1276 )
1277 elif func is None:
1278 # nicer error message
1279 raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
1281 if relabeling:
1282 func, columns, order = normalize_keyword_aggregation(kwargs)
1284 return relabeling, func, columns, order
1287def is_multi_agg_with_relabel(**kwargs) -> bool:
1288 """
1289 Check whether kwargs passed to .agg look like multi-agg with relabeling.
1291 Parameters
1292 ----------
1293 **kwargs : dict
1295 Returns
1296 -------
1297 bool
1299 Examples
1300 --------
1301 >>> is_multi_agg_with_relabel(a="max")
1302 False
1303 >>> is_multi_agg_with_relabel(a_max=("a", "max"), a_min=("a", "min"))
1304 True
1305 >>> is_multi_agg_with_relabel()
1306 False
1307 """
1308 return all(isinstance(v, tuple) and len(v) == 2 for v in kwargs.values()) and (
1309 len(kwargs) > 0
1310 )
1313def normalize_keyword_aggregation(
1314 kwargs: dict,
1315) -> tuple[dict, list[str], npt.NDArray[np.intp]]:
1316 """
1317 Normalize user-provided "named aggregation" kwargs.
1318 Transforms from the new ``Mapping[str, NamedAgg]`` style kwargs
1319 to the old Dict[str, List[scalar]]].
1321 Parameters
1322 ----------
1323 kwargs : dict
1325 Returns
1326 -------
1327 aggspec : dict
1328 The transformed kwargs.
1329 columns : List[str]
1330 The user-provided keys.
1331 col_idx_order : List[int]
1332 List of columns indices.
1334 Examples
1335 --------
1336 >>> normalize_keyword_aggregation({"output": ("input", "sum")})
1337 (defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0]))
1338 """
1339 from pandas.core.indexes.base import Index
1341 # Normalize the aggregation functions as Mapping[column, List[func]],
1342 # process normally, then fixup the names.
1343 # TODO: aggspec type: typing.Dict[str, List[AggScalar]]
1344 # May be hitting https://github.com/python/mypy/issues/5958
1345 # saying it doesn't have an attribute __name__
1346 aggspec: DefaultDict = defaultdict(list)
1347 order = []
1348 columns, pairs = list(zip(*kwargs.items()))
1350 for column, aggfunc in pairs:
1351 aggspec[column].append(aggfunc)
1352 order.append((column, com.get_callable_name(aggfunc) or aggfunc))
1354 # uniquify aggfunc name if duplicated in order list
1355 uniquified_order = _make_unique_kwarg_list(order)
1357 # GH 25719, due to aggspec will change the order of assigned columns in aggregation
1358 # uniquified_aggspec will store uniquified order list and will compare it with order
1359 # based on index
1360 aggspec_order = [
1361 (column, com.get_callable_name(aggfunc) or aggfunc)
1362 for column, aggfuncs in aggspec.items()
1363 for aggfunc in aggfuncs
1364 ]
1365 uniquified_aggspec = _make_unique_kwarg_list(aggspec_order)
1367 # get the new index of columns by comparison
1368 col_idx_order = Index(uniquified_aggspec).get_indexer(uniquified_order)
1369 return aggspec, columns, col_idx_order
1372def _make_unique_kwarg_list(
1373 seq: Sequence[tuple[Any, Any]]
1374) -> Sequence[tuple[Any, Any]]:
1375 """
1376 Uniquify aggfunc name of the pairs in the order list
1378 Examples:
1379 --------
1380 >>> kwarg_list = [('a', '<lambda>'), ('a', '<lambda>'), ('b', '<lambda>')]
1381 >>> _make_unique_kwarg_list(kwarg_list)
1382 [('a', '<lambda>_0'), ('a', '<lambda>_1'), ('b', '<lambda>')]
1383 """
1384 return [
1385 (pair[0], "_".join([pair[1], str(seq[:i].count(pair))]))
1386 if seq.count(pair) > 1
1387 else pair
1388 for i, pair in enumerate(seq)
1389 ]
1392def relabel_result(
1393 result: DataFrame | Series,
1394 func: dict[str, list[Callable | str]],
1395 columns: Iterable[Hashable],
1396 order: Iterable[int],
1397) -> dict[Hashable, Series]:
1398 """
1399 Internal function to reorder result if relabelling is True for
1400 dataframe.agg, and return the reordered result in dict.
1402 Parameters:
1403 ----------
1404 result: Result from aggregation
1405 func: Dict of (column name, funcs)
1406 columns: New columns name for relabelling
1407 order: New order for relabelling
1409 Examples:
1410 ---------
1411 >>> result = DataFrame({"A": [np.nan, 2, np.nan],
1412 ... "C": [6, np.nan, np.nan], "B": [np.nan, 4, 2.5]}) # doctest: +SKIP
1413 >>> funcs = {"A": ["max"], "C": ["max"], "B": ["mean", "min"]}
1414 >>> columns = ("foo", "aab", "bar", "dat")
1415 >>> order = [0, 1, 2, 3]
1416 >>> _relabel_result(result, func, columns, order) # doctest: +SKIP
1417 dict(A=Series([2.0, NaN, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
1418 C=Series([NaN, 6.0, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
1419 B=Series([NaN, NaN, 2.5, 4.0], index=["foo", "aab", "bar", "dat"]))
1420 """
1421 from pandas.core.indexes.base import Index
1423 reordered_indexes = [
1424 pair[0] for pair in sorted(zip(columns, order), key=lambda t: t[1])
1425 ]
1426 reordered_result_in_dict: dict[Hashable, Series] = {}
1427 idx = 0
1429 reorder_mask = not isinstance(result, ABCSeries) and len(result.columns) > 1
1430 for col, fun in func.items():
1431 s = result[col].dropna()
1433 # In the `_aggregate`, the callable names are obtained and used in `result`, and
1434 # these names are ordered alphabetically. e.g.
1435 # C2 C1
1436 # <lambda> 1 NaN
1437 # amax NaN 4.0
1438 # max NaN 4.0
1439 # sum 18.0 6.0
1440 # Therefore, the order of functions for each column could be shuffled
1441 # accordingly so need to get the callable name if it is not parsed names, and
1442 # reorder the aggregated result for each column.
1443 # e.g. if df.agg(c1=("C2", sum), c2=("C2", lambda x: min(x))), correct order is
1444 # [sum, <lambda>], but in `result`, it will be [<lambda>, sum], and we need to
1445 # reorder so that aggregated values map to their functions regarding the order.
1447 # However there is only one column being used for aggregation, not need to
1448 # reorder since the index is not sorted, and keep as is in `funcs`, e.g.
1449 # A
1450 # min 1.0
1451 # mean 1.5
1452 # mean 1.5
1453 if reorder_mask:
1454 fun = [
1455 com.get_callable_name(f) if not isinstance(f, str) else f for f in fun
1456 ]
1457 col_idx_order = Index(s.index).get_indexer(fun)
1458 s = s[col_idx_order]
1460 # assign the new user-provided "named aggregation" as index names, and reindex
1461 # it based on the whole user-provided names.
1462 s.index = reordered_indexes[idx : idx + len(fun)]
1463 reordered_result_in_dict[col] = s.reindex(columns, copy=False)
1464 idx = idx + len(fun)
1465 return reordered_result_in_dict
1468# TODO: Can't use, because mypy doesn't like us setting __name__
1469# error: "partial[Any]" has no attribute "__name__"
1470# the type is:
1471# typing.Sequence[Callable[..., ScalarResult]]
1472# -> typing.Sequence[Callable[..., ScalarResult]]:
1475def _managle_lambda_list(aggfuncs: Sequence[Any]) -> Sequence[Any]:
1476 """
1477 Possibly mangle a list of aggfuncs.
1479 Parameters
1480 ----------
1481 aggfuncs : Sequence
1483 Returns
1484 -------
1485 mangled: list-like
1486 A new AggSpec sequence, where lambdas have been converted
1487 to have unique names.
1489 Notes
1490 -----
1491 If just one aggfunc is passed, the name will not be mangled.
1492 """
1493 if len(aggfuncs) <= 1:
1494 # don't mangle for .agg([lambda x: .])
1495 return aggfuncs
1496 i = 0
1497 mangled_aggfuncs = []
1498 for aggfunc in aggfuncs:
1499 if com.get_callable_name(aggfunc) == "<lambda>":
1500 aggfunc = partial(aggfunc)
1501 aggfunc.__name__ = f"<lambda_{i}>"
1502 i += 1
1503 mangled_aggfuncs.append(aggfunc)
1505 return mangled_aggfuncs
1508def maybe_mangle_lambdas(agg_spec: Any) -> Any:
1509 """
1510 Make new lambdas with unique names.
1512 Parameters
1513 ----------
1514 agg_spec : Any
1515 An argument to GroupBy.agg.
1516 Non-dict-like `agg_spec` are pass through as is.
1517 For dict-like `agg_spec` a new spec is returned
1518 with name-mangled lambdas.
1520 Returns
1521 -------
1522 mangled : Any
1523 Same type as the input.
1525 Examples
1526 --------
1527 >>> maybe_mangle_lambdas('sum')
1528 'sum'
1529 >>> maybe_mangle_lambdas([lambda: 1, lambda: 2]) # doctest: +SKIP
1530 [<function __main__.<lambda_0>,
1531 <function pandas...._make_lambda.<locals>.f(*args, **kwargs)>]
1532 """
1533 is_dict = is_dict_like(agg_spec)
1534 if not (is_dict or is_list_like(agg_spec)):
1535 return agg_spec
1536 mangled_aggspec = type(agg_spec)() # dict or OrderedDict
1538 if is_dict:
1539 for key, aggfuncs in agg_spec.items():
1540 if is_list_like(aggfuncs) and not is_dict_like(aggfuncs):
1541 mangled_aggfuncs = _managle_lambda_list(aggfuncs)
1542 else:
1543 mangled_aggfuncs = aggfuncs
1545 mangled_aggspec[key] = mangled_aggfuncs
1546 else:
1547 mangled_aggspec = _managle_lambda_list(agg_spec)
1549 return mangled_aggspec
1552def validate_func_kwargs(
1553 kwargs: dict,
1554) -> tuple[list[str], list[str | Callable[..., Any]]]:
1555 """
1556 Validates types of user-provided "named aggregation" kwargs.
1557 `TypeError` is raised if aggfunc is not `str` or callable.
1559 Parameters
1560 ----------
1561 kwargs : dict
1563 Returns
1564 -------
1565 columns : List[str]
1566 List of user-provied keys.
1567 func : List[Union[str, callable[...,Any]]]
1568 List of user-provided aggfuncs
1570 Examples
1571 --------
1572 >>> validate_func_kwargs({'one': 'min', 'two': 'max'})
1573 (['one', 'two'], ['min', 'max'])
1574 """
1575 tuple_given_message = "func is expected but received {} in **kwargs."
1576 columns = list(kwargs)
1577 func = []
1578 for col_func in kwargs.values():
1579 if not (isinstance(col_func, str) or callable(col_func)):
1580 raise TypeError(tuple_given_message.format(type(col_func).__name__))
1581 func.append(col_func)
1582 if not columns:
1583 no_arg_message = "Must provide 'func' or named aggregation **kwargs."
1584 raise TypeError(no_arg_message)
1585 return columns, func