Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/arrays/_ranges.py: 9%
69 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Helper functions to generate range-like data for DatetimeArray
3(and possibly TimedeltaArray/PeriodArray)
4"""
5from __future__ import annotations
7import numpy as np
9from pandas._libs.lib import i8max
10from pandas._libs.tslibs import (
11 BaseOffset,
12 OutOfBoundsDatetime,
13 Timedelta,
14 Timestamp,
15 iNaT,
16)
17from pandas._typing import npt
20def generate_regular_range(
21 start: Timestamp | Timedelta | None,
22 end: Timestamp | Timedelta | None,
23 periods: int | None,
24 freq: BaseOffset,
25) -> npt.NDArray[np.intp]:
26 """
27 Generate a range of dates or timestamps with the spans between dates
28 described by the given `freq` DateOffset.
30 Parameters
31 ----------
32 start : Timedelta, Timestamp or None
33 First point of produced date range.
34 end : Timedelta, Timestamp or None
35 Last point of produced date range.
36 periods : int or None
37 Number of periods in produced date range.
38 freq : Tick
39 Describes space between dates in produced date range.
41 Returns
42 -------
43 ndarray[np.int64] Representing nanoseconds.
44 """
45 istart = start.value if start is not None else None
46 iend = end.value if end is not None else None
47 stride = freq.nanos
49 if periods is None and istart is not None and iend is not None:
50 b = istart
51 # cannot just use e = Timestamp(end) + 1 because arange breaks when
52 # stride is too large, see GH10887
53 e = b + (iend - b) // stride * stride + stride // 2 + 1
54 elif istart is not None and periods is not None:
55 b = istart
56 e = _generate_range_overflow_safe(b, periods, stride, side="start")
57 elif iend is not None and periods is not None:
58 e = iend + stride
59 b = _generate_range_overflow_safe(e, periods, stride, side="end")
60 else:
61 raise ValueError(
62 "at least 'start' or 'end' should be specified if a 'period' is given."
63 )
65 with np.errstate(over="raise"):
66 # If the range is sufficiently large, np.arange may overflow
67 # and incorrectly return an empty array if not caught.
68 try:
69 values = np.arange(b, e, stride, dtype=np.int64)
70 except FloatingPointError:
71 xdr = [b]
72 while xdr[-1] != e:
73 xdr.append(xdr[-1] + stride)
74 values = np.array(xdr[:-1], dtype=np.int64)
75 return values
78def _generate_range_overflow_safe(
79 endpoint: int, periods: int, stride: int, side: str = "start"
80) -> int:
81 """
82 Calculate the second endpoint for passing to np.arange, checking
83 to avoid an integer overflow. Catch OverflowError and re-raise
84 as OutOfBoundsDatetime.
86 Parameters
87 ----------
88 endpoint : int
89 nanosecond timestamp of the known endpoint of the desired range
90 periods : int
91 number of periods in the desired range
92 stride : int
93 nanoseconds between periods in the desired range
94 side : {'start', 'end'}
95 which end of the range `endpoint` refers to
97 Returns
98 -------
99 other_end : int
101 Raises
102 ------
103 OutOfBoundsDatetime
104 """
105 # GH#14187 raise instead of incorrectly wrapping around
106 assert side in ["start", "end"]
108 i64max = np.uint64(i8max)
109 msg = f"Cannot generate range with {side}={endpoint} and periods={periods}"
111 with np.errstate(over="raise"):
112 # if periods * strides cannot be multiplied within the *uint64* bounds,
113 # we cannot salvage the operation by recursing, so raise
114 try:
115 addend = np.uint64(periods) * np.uint64(np.abs(stride))
116 except FloatingPointError as err:
117 raise OutOfBoundsDatetime(msg) from err
119 if np.abs(addend) <= i64max:
120 # relatively easy case without casting concerns
121 return _generate_range_overflow_safe_signed(endpoint, periods, stride, side)
123 elif (endpoint > 0 and side == "start" and stride > 0) or (
124 endpoint < 0 and side == "end" and stride > 0
125 ):
126 # no chance of not-overflowing
127 raise OutOfBoundsDatetime(msg)
129 elif side == "end" and endpoint > i64max and endpoint - stride <= i64max:
130 # in _generate_regular_range we added `stride` thereby overflowing
131 # the bounds. Adjust to fix this.
132 return _generate_range_overflow_safe(
133 endpoint - stride, periods - 1, stride, side
134 )
136 # split into smaller pieces
137 mid_periods = periods // 2
138 remaining = periods - mid_periods
139 assert 0 < remaining < periods, (remaining, periods, endpoint, stride)
141 midpoint = _generate_range_overflow_safe(endpoint, mid_periods, stride, side)
142 return _generate_range_overflow_safe(midpoint, remaining, stride, side)
145def _generate_range_overflow_safe_signed(
146 endpoint: int, periods: int, stride: int, side: str
147) -> int:
148 """
149 A special case for _generate_range_overflow_safe where `periods * stride`
150 can be calculated without overflowing int64 bounds.
151 """
152 assert side in ["start", "end"]
153 if side == "end":
154 stride *= -1
156 with np.errstate(over="raise"):
157 addend = np.int64(periods) * np.int64(stride)
158 try:
159 # easy case with no overflows
160 result = np.int64(endpoint) + addend
161 if result == iNaT:
162 # Putting this into a DatetimeArray/TimedeltaArray
163 # would incorrectly be interpreted as NaT
164 raise OverflowError
165 # error: Incompatible return value type (got "signedinteger[_64Bit]",
166 # expected "int")
167 return result # type: ignore[return-value]
168 except (FloatingPointError, OverflowError):
169 # with endpoint negative and addend positive we risk
170 # FloatingPointError; with reversed signed we risk OverflowError
171 pass
173 # if stride and endpoint had opposite signs, then endpoint + addend
174 # should never overflow. so they must have the same signs
175 assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0)
177 if stride > 0:
178 # watch out for very special case in which we just slightly
179 # exceed implementation bounds, but when passing the result to
180 # np.arange will get a result slightly within the bounds
182 # error: Incompatible types in assignment (expression has type
183 # "unsignedinteger[_64Bit]", variable has type "signedinteger[_64Bit]")
184 result = np.uint64(endpoint) + np.uint64(addend) # type: ignore[assignment]
185 i64max = np.uint64(i8max)
186 assert result > i64max
187 if result <= i64max + np.uint64(stride):
188 # error: Incompatible return value type (got "unsignedinteger", expected
189 # "int")
190 return result # type: ignore[return-value]
192 raise OutOfBoundsDatetime(
193 f"Cannot generate range with {side}={endpoint} and periods={periods}"
194 )