Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/reshape/pivot.py: 6%
360 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from __future__ import annotations
3from typing import (
4 TYPE_CHECKING,
5 Callable,
6 Hashable,
7 Sequence,
8 cast,
9)
11import numpy as np
13from pandas._typing import (
14 AggFuncType,
15 AggFuncTypeBase,
16 AggFuncTypeDict,
17 IndexLabel,
18)
19from pandas.util._decorators import (
20 Appender,
21 Substitution,
22 deprecate_nonkeyword_arguments,
23)
24from pandas.util._exceptions import rewrite_warning
26from pandas.core.dtypes.cast import maybe_downcast_to_dtype
27from pandas.core.dtypes.common import (
28 is_integer_dtype,
29 is_list_like,
30 is_nested_list_like,
31 is_scalar,
32)
33from pandas.core.dtypes.generic import (
34 ABCDataFrame,
35 ABCSeries,
36)
38import pandas.core.common as com
39from pandas.core.frame import _shared_docs
40from pandas.core.groupby import Grouper
41from pandas.core.indexes.api import (
42 Index,
43 MultiIndex,
44 get_objs_combined_axis,
45)
46from pandas.core.reshape.concat import concat
47from pandas.core.reshape.util import cartesian_product
48from pandas.core.series import Series
50if TYPE_CHECKING: 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true
51 from pandas import DataFrame
54# Note: We need to make sure `frame` is imported before `pivot`, otherwise
55# _shared_docs['pivot_table'] will not yet exist. TODO: Fix this dependency
56@Substitution("\ndata : DataFrame")
57@Appender(_shared_docs["pivot_table"], indents=1)
58def pivot_table(
59 data: DataFrame,
60 values=None,
61 index=None,
62 columns=None,
63 aggfunc: AggFuncType = "mean",
64 fill_value=None,
65 margins: bool = False,
66 dropna: bool = True,
67 margins_name: str = "All",
68 observed: bool = False,
69 sort: bool = True,
70) -> DataFrame:
71 index = _convert_by(index)
72 columns = _convert_by(columns)
74 if isinstance(aggfunc, list):
75 pieces: list[DataFrame] = []
76 keys = []
77 for func in aggfunc:
78 _table = __internal_pivot_table(
79 data,
80 values=values,
81 index=index,
82 columns=columns,
83 fill_value=fill_value,
84 aggfunc=func,
85 margins=margins,
86 dropna=dropna,
87 margins_name=margins_name,
88 observed=observed,
89 sort=sort,
90 )
91 pieces.append(_table)
92 keys.append(getattr(func, "__name__", func))
94 table = concat(pieces, keys=keys, axis=1)
95 return table.__finalize__(data, method="pivot_table")
97 table = __internal_pivot_table(
98 data,
99 values,
100 index,
101 columns,
102 aggfunc,
103 fill_value,
104 margins,
105 dropna,
106 margins_name,
107 observed,
108 sort,
109 )
110 return table.__finalize__(data, method="pivot_table")
113def __internal_pivot_table(
114 data: DataFrame,
115 values,
116 index,
117 columns,
118 aggfunc: AggFuncTypeBase | AggFuncTypeDict,
119 fill_value,
120 margins: bool,
121 dropna: bool,
122 margins_name: str,
123 observed: bool,
124 sort: bool,
125) -> DataFrame:
126 """
127 Helper of :func:`pandas.pivot_table` for any non-list ``aggfunc``.
128 """
129 keys = index + columns
131 values_passed = values is not None
132 if values_passed:
133 if is_list_like(values):
134 values_multi = True
135 values = list(values)
136 else:
137 values_multi = False
138 values = [values]
140 # GH14938 Make sure value labels are in data
141 for i in values:
142 if i not in data:
143 raise KeyError(i)
145 to_filter = []
146 for x in keys + values:
147 if isinstance(x, Grouper):
148 x = x.key
149 try:
150 if x in data:
151 to_filter.append(x)
152 except TypeError:
153 pass
154 if len(to_filter) < len(data.columns):
155 data = data[to_filter]
157 else:
158 values = data.columns
159 for key in keys:
160 try:
161 values = values.drop(key)
162 except (TypeError, ValueError, KeyError):
163 pass
164 values = list(values)
166 grouped = data.groupby(keys, observed=observed, sort=sort)
167 msg = (
168 "pivot_table dropped a column because it failed to aggregate. This behavior "
169 "is deprecated and will raise in a future version of pandas. Select only the "
170 "columns that can be aggregated."
171 )
172 with rewrite_warning(
173 target_message="The default value of numeric_only",
174 target_category=FutureWarning,
175 new_message=msg,
176 ):
177 agged = grouped.agg(aggfunc)
179 if dropna and isinstance(agged, ABCDataFrame) and len(agged.columns):
180 agged = agged.dropna(how="all")
182 # gh-21133
183 # we want to down cast if
184 # the original values are ints
185 # as we grouped with a NaN value
186 # and then dropped, coercing to floats
187 for v in values:
188 if (
189 v in data
190 and is_integer_dtype(data[v])
191 and v in agged
192 and not is_integer_dtype(agged[v])
193 ):
194 if not isinstance(agged[v], ABCDataFrame) and isinstance(
195 data[v].dtype, np.dtype
196 ):
197 # exclude DataFrame case bc maybe_downcast_to_dtype expects
198 # ArrayLike
199 # e.g. test_pivot_table_multiindex_columns_doctest_case
200 # agged.columns is a MultiIndex and 'v' is indexing only
201 # on its first level.
202 agged[v] = maybe_downcast_to_dtype(agged[v], data[v].dtype)
204 table = agged
206 # GH17038, this check should only happen if index is defined (not None)
207 if table.index.nlevels > 1 and index:
208 # Related GH #17123
209 # If index_names are integers, determine whether the integers refer
210 # to the level position or name.
211 index_names = agged.index.names[: len(index)]
212 to_unstack = []
213 for i in range(len(index), len(keys)):
214 name = agged.index.names[i]
215 if name is None or name in index_names:
216 to_unstack.append(i)
217 else:
218 to_unstack.append(name)
219 table = agged.unstack(to_unstack)
221 if not dropna:
222 if isinstance(table.index, MultiIndex):
223 m = MultiIndex.from_arrays(
224 cartesian_product(table.index.levels), names=table.index.names
225 )
226 table = table.reindex(m, axis=0)
228 if isinstance(table.columns, MultiIndex):
229 m = MultiIndex.from_arrays(
230 cartesian_product(table.columns.levels), names=table.columns.names
231 )
232 table = table.reindex(m, axis=1)
234 if sort is True and isinstance(table, ABCDataFrame):
235 table = table.sort_index(axis=1)
237 if fill_value is not None:
238 table = table.fillna(fill_value, downcast="infer")
240 if margins:
241 if dropna:
242 data = data[data.notna().all(axis=1)]
243 table = _add_margins(
244 table,
245 data,
246 values,
247 rows=index,
248 cols=columns,
249 aggfunc=aggfunc,
250 observed=dropna,
251 margins_name=margins_name,
252 fill_value=fill_value,
253 )
255 # discard the top level
256 if values_passed and not values_multi and table.columns.nlevels > 1:
257 table = table.droplevel(0, axis=1)
258 if len(index) == 0 and len(columns) > 0:
259 table = table.T
261 # GH 15193 Make sure empty columns are removed if dropna=True
262 if isinstance(table, ABCDataFrame) and dropna:
263 table = table.dropna(how="all", axis=1)
265 return table
268def _add_margins(
269 table: DataFrame | Series,
270 data: DataFrame,
271 values,
272 rows,
273 cols,
274 aggfunc,
275 observed=None,
276 margins_name: str = "All",
277 fill_value=None,
278):
279 if not isinstance(margins_name, str):
280 raise ValueError("margins_name argument must be a string")
282 msg = f'Conflicting name "{margins_name}" in margins'
283 for level in table.index.names:
284 if margins_name in table.index.get_level_values(level):
285 raise ValueError(msg)
287 grand_margin = _compute_grand_margin(data, values, aggfunc, margins_name)
289 if table.ndim == 2:
290 # i.e. DataFrame
291 for level in table.columns.names[1:]:
292 if margins_name in table.columns.get_level_values(level):
293 raise ValueError(msg)
295 key: str | tuple[str, ...]
296 if len(rows) > 1:
297 key = (margins_name,) + ("",) * (len(rows) - 1)
298 else:
299 key = margins_name
301 if not values and isinstance(table, ABCSeries):
302 # If there are no values and the table is a series, then there is only
303 # one column in the data. Compute grand margin and return it.
304 return table._append(Series({key: grand_margin[margins_name]}))
306 elif values:
307 marginal_result_set = _generate_marginal_results(
308 table, data, values, rows, cols, aggfunc, observed, margins_name
309 )
310 if not isinstance(marginal_result_set, tuple):
311 return marginal_result_set
312 result, margin_keys, row_margin = marginal_result_set
313 else:
314 # no values, and table is a DataFrame
315 assert isinstance(table, ABCDataFrame)
316 marginal_result_set = _generate_marginal_results_without_values(
317 table, data, rows, cols, aggfunc, observed, margins_name
318 )
319 if not isinstance(marginal_result_set, tuple):
320 return marginal_result_set
321 result, margin_keys, row_margin = marginal_result_set
323 row_margin = row_margin.reindex(result.columns, fill_value=fill_value)
324 # populate grand margin
325 for k in margin_keys:
326 if isinstance(k, str):
327 row_margin[k] = grand_margin[k]
328 else:
329 row_margin[k] = grand_margin[k[0]]
331 from pandas import DataFrame
333 margin_dummy = DataFrame(row_margin, columns=Index([key])).T
335 row_names = result.index.names
336 # check the result column and leave floats
337 for dtype in set(result.dtypes):
338 cols = result.select_dtypes([dtype]).columns
339 margin_dummy[cols] = margin_dummy[cols].apply(
340 maybe_downcast_to_dtype, args=(dtype,)
341 )
342 result = result._append(margin_dummy)
343 result.index.names = row_names
345 return result
348def _compute_grand_margin(data: DataFrame, values, aggfunc, margins_name: str = "All"):
350 if values:
351 grand_margin = {}
352 for k, v in data[values].items():
353 try:
354 if isinstance(aggfunc, str):
355 grand_margin[k] = getattr(v, aggfunc)()
356 elif isinstance(aggfunc, dict):
357 if isinstance(aggfunc[k], str):
358 grand_margin[k] = getattr(v, aggfunc[k])()
359 else:
360 grand_margin[k] = aggfunc[k](v)
361 else:
362 grand_margin[k] = aggfunc(v)
363 except TypeError:
364 pass
365 return grand_margin
366 else:
367 return {margins_name: aggfunc(data.index)}
370def _generate_marginal_results(
371 table, data, values, rows, cols, aggfunc, observed, margins_name: str = "All"
372):
373 if len(cols) > 0:
374 # need to "interleave" the margins
375 table_pieces = []
376 margin_keys = []
378 def _all_key(key):
379 return (key, margins_name) + ("",) * (len(cols) - 1)
381 if len(rows) > 0:
382 margin = data[rows + values].groupby(rows, observed=observed).agg(aggfunc)
383 cat_axis = 1
385 for key, piece in table.groupby(level=0, axis=cat_axis, observed=observed):
386 all_key = _all_key(key)
388 # we are going to mutate this, so need to copy!
389 piece = piece.copy()
390 piece[all_key] = margin[key]
392 table_pieces.append(piece)
393 margin_keys.append(all_key)
394 else:
395 from pandas import DataFrame
397 cat_axis = 0
398 for key, piece in table.groupby(level=0, axis=cat_axis, observed=observed):
399 if len(cols) > 1:
400 all_key = _all_key(key)
401 else:
402 all_key = margins_name
403 table_pieces.append(piece)
404 # GH31016 this is to calculate margin for each group, and assign
405 # corresponded key as index
406 transformed_piece = DataFrame(piece.apply(aggfunc)).T
407 transformed_piece.index = Index([all_key], name=piece.index.name)
409 # append piece for margin into table_piece
410 table_pieces.append(transformed_piece)
411 margin_keys.append(all_key)
413 result = concat(table_pieces, axis=cat_axis)
415 if len(rows) == 0:
416 return result
417 else:
418 result = table
419 margin_keys = table.columns
421 if len(cols) > 0:
422 row_margin = data[cols + values].groupby(cols, observed=observed).agg(aggfunc)
423 row_margin = row_margin.stack()
425 # slight hack
426 new_order = [len(cols)] + list(range(len(cols)))
427 row_margin.index = row_margin.index.reorder_levels(new_order)
428 else:
429 row_margin = Series(np.nan, index=result.columns)
431 return result, margin_keys, row_margin
434def _generate_marginal_results_without_values(
435 table: DataFrame, data, rows, cols, aggfunc, observed, margins_name: str = "All"
436):
437 if len(cols) > 0:
438 # need to "interleave" the margins
439 margin_keys: list | Index = []
441 def _all_key():
442 if len(cols) == 1:
443 return margins_name
444 return (margins_name,) + ("",) * (len(cols) - 1)
446 if len(rows) > 0:
447 margin = data[rows].groupby(rows, observed=observed).apply(aggfunc)
448 all_key = _all_key()
449 table[all_key] = margin
450 result = table
451 margin_keys.append(all_key)
453 else:
454 margin = data.groupby(level=0, axis=0, observed=observed).apply(aggfunc)
455 all_key = _all_key()
456 table[all_key] = margin
457 result = table
458 margin_keys.append(all_key)
459 return result
460 else:
461 result = table
462 margin_keys = table.columns
464 if len(cols):
465 row_margin = data[cols].groupby(cols, observed=observed).apply(aggfunc)
466 else:
467 row_margin = Series(np.nan, index=result.columns)
469 return result, margin_keys, row_margin
472def _convert_by(by):
473 if by is None:
474 by = []
475 elif (
476 is_scalar(by)
477 or isinstance(by, (np.ndarray, Index, ABCSeries, Grouper))
478 or callable(by)
479 ):
480 by = [by]
481 else:
482 by = list(by)
483 return by
486@Substitution("\ndata : DataFrame")
487@Appender(_shared_docs["pivot"], indents=1)
488@deprecate_nonkeyword_arguments(version=None, allowed_args=["data"])
489def pivot(
490 data: DataFrame,
491 index: IndexLabel | None = None,
492 columns: IndexLabel | None = None,
493 values: IndexLabel | None = None,
494) -> DataFrame:
495 if columns is None:
496 raise TypeError("pivot() missing 1 required argument: 'columns'")
498 columns_listlike = com.convert_to_list_like(columns)
500 indexed: DataFrame | Series
501 if values is None:
502 if index is not None:
503 cols = com.convert_to_list_like(index)
504 else:
505 cols = []
507 append = index is None
508 # error: Unsupported operand types for + ("List[Any]" and "ExtensionArray")
509 # error: Unsupported left operand type for + ("ExtensionArray")
510 indexed = data.set_index(
511 cols + columns_listlike, append=append # type: ignore[operator]
512 )
513 else:
514 if index is None:
515 if isinstance(data.index, MultiIndex):
516 # GH 23955
517 index_list = [
518 data.index.get_level_values(i) for i in range(data.index.nlevels)
519 ]
520 else:
521 index_list = [Series(data.index, name=data.index.name)]
522 else:
523 index_list = [data[idx] for idx in com.convert_to_list_like(index)]
525 data_columns = [data[col] for col in columns_listlike]
526 index_list.extend(data_columns)
527 multiindex = MultiIndex.from_arrays(index_list)
529 if is_list_like(values) and not isinstance(values, tuple):
530 # Exclude tuple because it is seen as a single column name
531 values = cast(Sequence[Hashable], values)
532 indexed = data._constructor(
533 data[values]._values, index=multiindex, columns=values
534 )
535 else:
536 indexed = data._constructor_sliced(data[values]._values, index=multiindex)
537 # error: Argument 1 to "unstack" of "DataFrame" has incompatible type "Union
538 # [List[Any], ExtensionArray, ndarray[Any, Any], Index, Series]"; expected
539 # "Hashable"
540 return indexed.unstack(columns_listlike) # type: ignore[arg-type]
543def crosstab(
544 index,
545 columns,
546 values=None,
547 rownames=None,
548 colnames=None,
549 aggfunc=None,
550 margins: bool = False,
551 margins_name: str = "All",
552 dropna: bool = True,
553 normalize=False,
554) -> DataFrame:
555 """
556 Compute a simple cross tabulation of two (or more) factors.
558 By default, computes a frequency table of the factors unless an
559 array of values and an aggregation function are passed.
561 Parameters
562 ----------
563 index : array-like, Series, or list of arrays/Series
564 Values to group by in the rows.
565 columns : array-like, Series, or list of arrays/Series
566 Values to group by in the columns.
567 values : array-like, optional
568 Array of values to aggregate according to the factors.
569 Requires `aggfunc` be specified.
570 rownames : sequence, default None
571 If passed, must match number of row arrays passed.
572 colnames : sequence, default None
573 If passed, must match number of column arrays passed.
574 aggfunc : function, optional
575 If specified, requires `values` be specified as well.
576 margins : bool, default False
577 Add row/column margins (subtotals).
578 margins_name : str, default 'All'
579 Name of the row/column that will contain the totals
580 when margins is True.
581 dropna : bool, default True
582 Do not include columns whose entries are all NaN.
583 normalize : bool, {'all', 'index', 'columns'}, or {0,1}, default False
584 Normalize by dividing all values by the sum of values.
586 - If passed 'all' or `True`, will normalize over all values.
587 - If passed 'index' will normalize over each row.
588 - If passed 'columns' will normalize over each column.
589 - If margins is `True`, will also normalize margin values.
591 Returns
592 -------
593 DataFrame
594 Cross tabulation of the data.
596 See Also
597 --------
598 DataFrame.pivot : Reshape data based on column values.
599 pivot_table : Create a pivot table as a DataFrame.
601 Notes
602 -----
603 Any Series passed will have their name attributes used unless row or column
604 names for the cross-tabulation are specified.
606 Any input passed containing Categorical data will have **all** of its
607 categories included in the cross-tabulation, even if the actual data does
608 not contain any instances of a particular category.
610 In the event that there aren't overlapping indexes an empty DataFrame will
611 be returned.
613 Reference :ref:`the user guide <reshaping.crosstabulations>` for more examples.
615 Examples
616 --------
617 >>> a = np.array(["foo", "foo", "foo", "foo", "bar", "bar",
618 ... "bar", "bar", "foo", "foo", "foo"], dtype=object)
619 >>> b = np.array(["one", "one", "one", "two", "one", "one",
620 ... "one", "two", "two", "two", "one"], dtype=object)
621 >>> c = np.array(["dull", "dull", "shiny", "dull", "dull", "shiny",
622 ... "shiny", "dull", "shiny", "shiny", "shiny"],
623 ... dtype=object)
624 >>> pd.crosstab(a, [b, c], rownames=['a'], colnames=['b', 'c'])
625 b one two
626 c dull shiny dull shiny
627 a
628 bar 1 2 1 0
629 foo 2 2 1 2
631 Here 'c' and 'f' are not represented in the data and will not be
632 shown in the output because dropna is True by default. Set
633 dropna=False to preserve categories with no data.
635 >>> foo = pd.Categorical(['a', 'b'], categories=['a', 'b', 'c'])
636 >>> bar = pd.Categorical(['d', 'e'], categories=['d', 'e', 'f'])
637 >>> pd.crosstab(foo, bar)
638 col_0 d e
639 row_0
640 a 1 0
641 b 0 1
642 >>> pd.crosstab(foo, bar, dropna=False)
643 col_0 d e f
644 row_0
645 a 1 0 0
646 b 0 1 0
647 c 0 0 0
648 """
649 if values is None and aggfunc is not None:
650 raise ValueError("aggfunc cannot be used without values.")
652 if values is not None and aggfunc is None:
653 raise ValueError("values cannot be used without an aggfunc.")
655 if not is_nested_list_like(index):
656 index = [index]
657 if not is_nested_list_like(columns):
658 columns = [columns]
660 common_idx = None
661 pass_objs = [x for x in index + columns if isinstance(x, (ABCSeries, ABCDataFrame))]
662 if pass_objs:
663 common_idx = get_objs_combined_axis(pass_objs, intersect=True, sort=False)
665 rownames = _get_names(index, rownames, prefix="row")
666 colnames = _get_names(columns, colnames, prefix="col")
668 # duplicate names mapped to unique names for pivot op
669 (
670 rownames_mapper,
671 unique_rownames,
672 colnames_mapper,
673 unique_colnames,
674 ) = _build_names_mapper(rownames, colnames)
676 from pandas import DataFrame
678 data = {
679 **dict(zip(unique_rownames, index)),
680 **dict(zip(unique_colnames, columns)),
681 }
682 df = DataFrame(data, index=common_idx)
684 if values is None:
685 df["__dummy__"] = 0
686 kwargs = {"aggfunc": len, "fill_value": 0}
687 else:
688 df["__dummy__"] = values
689 kwargs = {"aggfunc": aggfunc}
691 table = df.pivot_table(
692 "__dummy__",
693 index=unique_rownames,
694 columns=unique_colnames,
695 margins=margins,
696 margins_name=margins_name,
697 dropna=dropna,
698 **kwargs,
699 )
701 # Post-process
702 if normalize is not False:
703 table = _normalize(
704 table, normalize=normalize, margins=margins, margins_name=margins_name
705 )
707 table = table.rename_axis(index=rownames_mapper, axis=0)
708 table = table.rename_axis(columns=colnames_mapper, axis=1)
710 return table
713def _normalize(
714 table: DataFrame, normalize, margins: bool, margins_name="All"
715) -> DataFrame:
717 if not isinstance(normalize, (bool, str)):
718 axis_subs = {0: "index", 1: "columns"}
719 try:
720 normalize = axis_subs[normalize]
721 except KeyError as err:
722 raise ValueError("Not a valid normalize argument") from err
724 if margins is False:
726 # Actual Normalizations
727 normalizers: dict[bool | str, Callable] = {
728 "all": lambda x: x / x.sum(axis=1).sum(axis=0),
729 "columns": lambda x: x / x.sum(),
730 "index": lambda x: x.div(x.sum(axis=1), axis=0),
731 }
733 normalizers[True] = normalizers["all"]
735 try:
736 f = normalizers[normalize]
737 except KeyError as err:
738 raise ValueError("Not a valid normalize argument") from err
740 table = f(table)
741 table = table.fillna(0)
743 elif margins is True:
744 # keep index and column of pivoted table
745 table_index = table.index
746 table_columns = table.columns
747 last_ind_or_col = table.iloc[-1, :].name
749 # check if margin name is not in (for MI cases) and not equal to last
750 # index/column and save the column and index margin
751 if (margins_name not in last_ind_or_col) & (margins_name != last_ind_or_col):
752 raise ValueError(f"{margins_name} not in pivoted DataFrame")
753 column_margin = table.iloc[:-1, -1]
754 index_margin = table.iloc[-1, :-1]
756 # keep the core table
757 table = table.iloc[:-1, :-1]
759 # Normalize core
760 table = _normalize(table, normalize=normalize, margins=False)
762 # Fix Margins
763 if normalize == "columns":
764 column_margin = column_margin / column_margin.sum()
765 table = concat([table, column_margin], axis=1)
766 table = table.fillna(0)
767 table.columns = table_columns
769 elif normalize == "index":
770 index_margin = index_margin / index_margin.sum()
771 table = table._append(index_margin)
772 table = table.fillna(0)
773 table.index = table_index
775 elif normalize == "all" or normalize is True:
776 column_margin = column_margin / column_margin.sum()
777 index_margin = index_margin / index_margin.sum()
778 index_margin.loc[margins_name] = 1
779 table = concat([table, column_margin], axis=1)
780 table = table._append(index_margin)
782 table = table.fillna(0)
783 table.index = table_index
784 table.columns = table_columns
786 else:
787 raise ValueError("Not a valid normalize argument")
789 else:
790 raise ValueError("Not a valid margins argument")
792 return table
795def _get_names(arrs, names, prefix: str = "row"):
796 if names is None:
797 names = []
798 for i, arr in enumerate(arrs):
799 if isinstance(arr, ABCSeries) and arr.name is not None:
800 names.append(arr.name)
801 else:
802 names.append(f"{prefix}_{i}")
803 else:
804 if len(names) != len(arrs):
805 raise AssertionError("arrays and names must have the same length")
806 if not isinstance(names, list):
807 names = list(names)
809 return names
812def _build_names_mapper(
813 rownames: list[str], colnames: list[str]
814) -> tuple[dict[str, str], list[str], dict[str, str], list[str]]:
815 """
816 Given the names of a DataFrame's rows and columns, returns a set of unique row
817 and column names and mappers that convert to original names.
819 A row or column name is replaced if it is duplicate among the rows of the inputs,
820 among the columns of the inputs or between the rows and the columns.
822 Parameters
823 ----------
824 rownames: list[str]
825 colnames: list[str]
827 Returns
828 -------
829 Tuple(Dict[str, str], List[str], Dict[str, str], List[str])
831 rownames_mapper: dict[str, str]
832 a dictionary with new row names as keys and original rownames as values
833 unique_rownames: list[str]
834 a list of rownames with duplicate names replaced by dummy names
835 colnames_mapper: dict[str, str]
836 a dictionary with new column names as keys and original column names as values
837 unique_colnames: list[str]
838 a list of column names with duplicate names replaced by dummy names
840 """
842 def get_duplicates(names):
843 seen: set = set()
844 return {name for name in names if name not in seen}
846 shared_names = set(rownames).intersection(set(colnames))
847 dup_names = get_duplicates(rownames) | get_duplicates(colnames) | shared_names
849 rownames_mapper = {
850 f"row_{i}": name for i, name in enumerate(rownames) if name in dup_names
851 }
852 unique_rownames = [
853 f"row_{i}" if name in dup_names else name for i, name in enumerate(rownames)
854 ]
856 colnames_mapper = {
857 f"col_{i}": name for i, name in enumerate(colnames) if name in dup_names
858 }
859 unique_colnames = [
860 f"col_{i}" if name in dup_names else name for i, name in enumerate(colnames)
861 ]
863 return rownames_mapper, unique_rownames, colnames_mapper, unique_colnames