Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/core/computation/scope.py: 28%
110 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Module for scope operations
3"""
4from __future__ import annotations
6import datetime
7import inspect
8from io import StringIO
9import itertools
10import pprint
11import struct
12import sys
14import numpy as np
16from pandas._libs.tslibs import Timestamp
17from pandas.compat.chainmap import DeepChainMap
18from pandas.errors import UndefinedVariableError
21def ensure_scope(
22 level: int, global_dict=None, local_dict=None, resolvers=(), target=None, **kwargs
23) -> Scope:
24 """Ensure that we are grabbing the correct scope."""
25 return Scope(
26 level + 1,
27 global_dict=global_dict,
28 local_dict=local_dict,
29 resolvers=resolvers,
30 target=target,
31 )
34def _replacer(x) -> str:
35 """
36 Replace a number with its hexadecimal representation. Used to tag
37 temporary variables with their calling scope's id.
38 """
39 # get the hex repr of the binary char and remove 0x and pad by pad_size
40 # zeros
41 try:
42 hexin = ord(x)
43 except TypeError:
44 # bytes literals masquerade as ints when iterating in py3
45 hexin = x
47 return hex(hexin)
50def _raw_hex_id(obj) -> str:
51 """Return the padded hexadecimal id of ``obj``."""
52 # interpret as a pointer since that's what really what id returns
53 packed = struct.pack("@P", id(obj))
54 return "".join([_replacer(x) for x in packed])
57DEFAULT_GLOBALS = {
58 "Timestamp": Timestamp,
59 "datetime": datetime.datetime,
60 "True": True,
61 "False": False,
62 "list": list,
63 "tuple": tuple,
64 "inf": np.inf,
65 "Inf": np.inf,
66}
69def _get_pretty_string(obj) -> str:
70 """
71 Return a prettier version of obj.
73 Parameters
74 ----------
75 obj : object
76 Object to pretty print
78 Returns
79 -------
80 str
81 Pretty print object repr
82 """
83 sio = StringIO()
84 pprint.pprint(obj, stream=sio)
85 return sio.getvalue()
88class Scope:
89 """
90 Object to hold scope, with a few bells to deal with some custom syntax
91 and contexts added by pandas.
93 Parameters
94 ----------
95 level : int
96 global_dict : dict or None, optional, default None
97 local_dict : dict or Scope or None, optional, default None
98 resolvers : list-like or None, optional, default None
99 target : object
101 Attributes
102 ----------
103 level : int
104 scope : DeepChainMap
105 target : object
106 temps : dict
107 """
109 __slots__ = ["level", "scope", "target", "resolvers", "temps"]
110 level: int
111 scope: DeepChainMap
112 resolvers: DeepChainMap
113 temps: dict
115 def __init__(
116 self, level: int, global_dict=None, local_dict=None, resolvers=(), target=None
117 ) -> None:
118 self.level = level + 1
120 # shallow copy because we don't want to keep filling this up with what
121 # was there before if there are multiple calls to Scope/_ensure_scope
122 self.scope = DeepChainMap(DEFAULT_GLOBALS.copy())
123 self.target = target
125 if isinstance(local_dict, Scope):
126 self.scope.update(local_dict.scope)
127 if local_dict.target is not None:
128 self.target = local_dict.target
129 self._update(local_dict.level)
131 frame = sys._getframe(self.level)
133 try:
134 # shallow copy here because we don't want to replace what's in
135 # scope when we align terms (alignment accesses the underlying
136 # numpy array of pandas objects)
137 scope_global = self.scope.new_child(
138 (global_dict if global_dict is not None else frame.f_globals).copy()
139 )
140 self.scope = DeepChainMap(scope_global)
141 if not isinstance(local_dict, Scope):
142 scope_local = self.scope.new_child(
143 (local_dict if local_dict is not None else frame.f_locals).copy()
144 )
145 self.scope = DeepChainMap(scope_local)
146 finally:
147 del frame
149 # assumes that resolvers are going from outermost scope to inner
150 if isinstance(local_dict, Scope):
151 resolvers += tuple(local_dict.resolvers.maps)
152 self.resolvers = DeepChainMap(*resolvers)
153 self.temps = {}
155 def __repr__(self) -> str:
156 scope_keys = _get_pretty_string(list(self.scope.keys()))
157 res_keys = _get_pretty_string(list(self.resolvers.keys()))
158 return f"{type(self).__name__}(scope={scope_keys}, resolvers={res_keys})"
160 @property
161 def has_resolvers(self) -> bool:
162 """
163 Return whether we have any extra scope.
165 For example, DataFrames pass Their columns as resolvers during calls to
166 ``DataFrame.eval()`` and ``DataFrame.query()``.
168 Returns
169 -------
170 hr : bool
171 """
172 return bool(len(self.resolvers))
174 def resolve(self, key: str, is_local: bool):
175 """
176 Resolve a variable name in a possibly local context.
178 Parameters
179 ----------
180 key : str
181 A variable name
182 is_local : bool
183 Flag indicating whether the variable is local or not (prefixed with
184 the '@' symbol)
186 Returns
187 -------
188 value : object
189 The value of a particular variable
190 """
191 try:
192 # only look for locals in outer scope
193 if is_local:
194 return self.scope[key]
196 # not a local variable so check in resolvers if we have them
197 if self.has_resolvers:
198 return self.resolvers[key]
200 # if we're here that means that we have no locals and we also have
201 # no resolvers
202 assert not is_local and not self.has_resolvers
203 return self.scope[key]
204 except KeyError:
205 try:
206 # last ditch effort we look in temporaries
207 # these are created when parsing indexing expressions
208 # e.g., df[df > 0]
209 return self.temps[key]
210 except KeyError as err:
211 raise UndefinedVariableError(key, is_local) from err
213 def swapkey(self, old_key: str, new_key: str, new_value=None) -> None:
214 """
215 Replace a variable name, with a potentially new value.
217 Parameters
218 ----------
219 old_key : str
220 Current variable name to replace
221 new_key : str
222 New variable name to replace `old_key` with
223 new_value : object
224 Value to be replaced along with the possible renaming
225 """
226 if self.has_resolvers:
227 maps = self.resolvers.maps + self.scope.maps
228 else:
229 maps = self.scope.maps
231 maps.append(self.temps)
233 for mapping in maps:
234 if old_key in mapping:
235 mapping[new_key] = new_value
236 return
238 def _get_vars(self, stack, scopes: list[str]) -> None:
239 """
240 Get specifically scoped variables from a list of stack frames.
242 Parameters
243 ----------
244 stack : list
245 A list of stack frames as returned by ``inspect.stack()``
246 scopes : sequence of strings
247 A sequence containing valid stack frame attribute names that
248 evaluate to a dictionary. For example, ('locals', 'globals')
249 """
250 variables = itertools.product(scopes, stack)
251 for scope, (frame, _, _, _, _, _) in variables:
252 try:
253 d = getattr(frame, "f_" + scope)
254 self.scope = DeepChainMap(self.scope.new_child(d))
255 finally:
256 # won't remove it, but DECREF it
257 # in Py3 this probably isn't necessary since frame won't be
258 # scope after the loop
259 del frame
261 def _update(self, level: int) -> None:
262 """
263 Update the current scope by going back `level` levels.
265 Parameters
266 ----------
267 level : int
268 """
269 sl = level + 1
271 # add sl frames to the scope starting with the
272 # most distant and overwriting with more current
273 # makes sure that we can capture variable scope
274 stack = inspect.stack()
276 try:
277 self._get_vars(stack[:sl], scopes=["locals"])
278 finally:
279 del stack[:], stack
281 def add_tmp(self, value) -> str:
282 """
283 Add a temporary variable to the scope.
285 Parameters
286 ----------
287 value : object
288 An arbitrary object to be assigned to a temporary variable.
290 Returns
291 -------
292 str
293 The name of the temporary variable created.
294 """
295 name = f"{type(value).__name__}_{self.ntemps}_{_raw_hex_id(self)}"
297 # add to inner most scope
298 assert name not in self.temps
299 self.temps[name] = value
300 assert name in self.temps
302 # only increment if the variable gets put in the scope
303 return name
305 @property
306 def ntemps(self) -> int:
307 """The number of temporary variables in this scope"""
308 return len(self.temps)
310 @property
311 def full_scope(self) -> DeepChainMap:
312 """
313 Return the full scope for use with passing to engines transparently
314 as a mapping.
316 Returns
317 -------
318 vars : DeepChainMap
319 All variables in this scope.
320 """
321 # error: Unsupported operand types for + ("List[Dict[Any, Any]]" and
322 # "List[Mapping[Any, Any]]")
323 # error: Unsupported operand types for + ("List[Dict[Any, Any]]" and
324 # "List[Mapping[str, Any]]")
325 maps = (
326 [self.temps]
327 + self.resolvers.maps # type: ignore[operator]
328 + self.scope.maps # type: ignore[operator]
329 )
330 return DeepChainMap(*maps)