Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/faker/proxy.py: 33%
170 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import copy
2import functools
3import re
5from collections import OrderedDict
6from random import Random
7from typing import Any, Callable, Dict, List, Optional, Pattern, Sequence, Tuple, Union
9from .config import DEFAULT_LOCALE
10from .exceptions import UniquenessException
11from .factory import Factory
12from .generator import Generator, random
13from .typing import SeedType
14from .utils.distribution import choices_distribution
16_UNIQUE_ATTEMPTS = 1000
19class Faker:
20 """Proxy class capable of supporting multiple locales"""
22 cache_pattern: Pattern = re.compile(r"^_cached_\w*_mapping$")
23 generator_attrs = [
24 attr for attr in dir(Generator) if not attr.startswith("__") and attr not in ["seed", "seed_instance", "random"]
25 ]
27 def __init__(
28 self,
29 locale: Optional[Union[str, Sequence[str], Dict[str, Union[int, float]]]] = None,
30 providers: Optional[List[str]] = None,
31 generator: Optional[Generator] = None,
32 includes: Optional[List[str]] = None,
33 use_weighting: bool = True,
34 **config: Any,
35 ) -> None:
36 self._factory_map = OrderedDict()
37 self._weights = None
38 self._unique_proxy = UniqueProxy(self)
40 if isinstance(locale, str): 40 ↛ 45line 40 didn't jump to line 45, because the condition on line 40 was never false
41 locales = [locale.replace("-", "_")]
43 # This guarantees a FIFO ordering of elements in `locales` based on the final
44 # locale string while discarding duplicates after processing
45 elif isinstance(locale, (list, tuple, set)):
46 locales = []
47 for code in locale:
48 if not isinstance(code, str):
49 raise TypeError('The locale "%s" must be a string.' % str(code))
50 final_locale = code.replace("-", "_")
51 if final_locale not in locales:
52 locales.append(final_locale)
54 elif isinstance(locale, OrderedDict):
55 assert all(isinstance(v, (int, float)) for v in locale.values())
56 odict = OrderedDict()
57 for k, v in locale.items():
58 key = k.replace("-", "_")
59 odict[key] = v
60 locales = list(odict.keys())
61 self._weights = list(odict.values())
63 else:
64 locales = [DEFAULT_LOCALE]
66 for locale in locales:
67 self._factory_map[locale] = Factory.create(
68 locale,
69 providers,
70 generator,
71 includes,
72 use_weighting=use_weighting,
73 **config,
74 )
76 self._locales = locales
77 self._factories = list(self._factory_map.values())
79 def __dir__(self):
80 attributes = set(super(Faker, self).__dir__())
81 for factory in self.factories:
82 attributes |= {attr for attr in dir(factory) if not attr.startswith("_")}
83 return sorted(attributes)
85 def __getitem__(self, locale: str) -> Generator:
86 return self._factory_map[locale.replace("-", "_")]
88 def __getattribute__(self, attr: str) -> Any:
89 """
90 Handles the "attribute resolution" behavior for declared members of this proxy class
92 The class method `seed` cannot be called from an instance.
94 :param attr: attribute name
95 :return: the appropriate attribute
96 """
97 if attr == "seed": 97 ↛ 98line 97 didn't jump to line 98, because the condition on line 97 was never true
98 msg = "Calling `.seed()` on instances is deprecated. " "Use the class method `Faker.seed()` instead."
99 raise TypeError(msg)
100 else:
101 return super().__getattribute__(attr)
103 def __getattr__(self, attr: str) -> Any:
104 """
105 Handles cache access and proxying behavior
107 :param attr: attribute name
108 :return: the appropriate attribute
109 """
110 if len(self._factories) == 1: 110 ↛ 112line 110 didn't jump to line 112, because the condition on line 110 was never false
111 return getattr(self._factories[0], attr)
112 elif attr in self.generator_attrs:
113 msg = "Proxying calls to `%s` is not implemented in multiple locale mode." % attr
114 raise NotImplementedError(msg)
115 elif self.cache_pattern.match(attr):
116 msg = "Cached attribute `%s` does not exist" % attr
117 raise AttributeError(msg)
118 else:
119 factory = self._select_factory(attr)
120 return getattr(factory, attr)
122 def __deepcopy__(self, memodict: Dict = {}) -> "Faker":
123 cls = self.__class__
124 result = cls.__new__(cls)
125 result._locales = copy.deepcopy(self._locales)
126 result._factories = copy.deepcopy(self._factories)
127 result._factory_map = copy.deepcopy(self._factory_map)
128 result._weights = copy.deepcopy(self._weights)
129 result._unique_proxy = UniqueProxy(self)
130 result._unique_proxy._seen = {k: {result._unique_proxy._sentinel} for k in self._unique_proxy._seen.keys()}
131 return result
133 def __setstate__(self, state: Any) -> None:
134 self.__dict__.update(state)
136 @property
137 def unique(self) -> "UniqueProxy":
138 return self._unique_proxy
140 def _select_factory(self, method_name: str) -> Factory:
141 """
142 Returns a random factory that supports the provider method
144 :param method_name: Name of provider method
145 :return: A factory that supports the provider method
146 """
148 factories, weights = self._map_provider_method(method_name)
150 if len(factories) == 0:
151 msg = f"No generator object has attribute {method_name!r}"
152 raise AttributeError(msg)
153 elif len(factories) == 1:
154 return factories[0]
156 if weights:
157 factory = self._select_factory_distribution(factories, weights)
158 else:
159 factory = self._select_factory_choice(factories)
160 return factory
162 def _select_factory_distribution(self, factories, weights):
163 return choices_distribution(factories, weights, random, length=1)[0]
165 def _select_factory_choice(self, factories):
166 return random.choice(factories)
168 def _map_provider_method(self, method_name: str) -> Tuple[List[Factory], Optional[List[float]]]:
169 """
170 Creates a 2-tuple of factories and weights for the given provider method name
172 The first element of the tuple contains a list of compatible factories.
173 The second element of the tuple contains a list of distribution weights.
175 :param method_name: Name of provider method
176 :return: 2-tuple (factories, weights)
177 """
179 # Return cached mapping if it exists for given method
180 attr = f"_cached_{method_name}_mapping"
181 if hasattr(self, attr):
182 return getattr(self, attr)
184 # Create mapping if it does not exist
185 if self._weights:
186 value = [
187 (factory, weight)
188 for factory, weight in zip(self.factories, self._weights)
189 if hasattr(factory, method_name)
190 ]
191 factories, weights = zip(*value)
192 mapping = list(factories), list(weights)
193 else:
194 value = [factory for factory in self.factories if hasattr(factory, method_name)] # type: ignore
195 mapping = value, None # type: ignore
197 # Then cache and return results
198 setattr(self, attr, mapping)
199 return mapping
201 @classmethod
202 def seed(cls, seed: Optional[SeedType] = None) -> None:
203 """
204 Hashables the shared `random.Random` object across all factories
206 :param seed: seed value
207 """
208 Generator.seed(seed)
210 def seed_instance(self, seed: Optional[SeedType] = None) -> None:
211 """
212 Creates and seeds a new `random.Random` object for each factory
214 :param seed: seed value
215 """
216 for factory in self._factories:
217 factory.seed_instance(seed)
219 def seed_locale(self, locale: str, seed: Optional[SeedType] = None) -> None:
220 """
221 Creates and seeds a new `random.Random` object for the factory of the specified locale
223 :param locale: locale string
224 :param seed: seed value
225 """
226 self._factory_map[locale.replace("-", "_")].seed_instance(seed)
228 @property
229 def random(self) -> Random:
230 """
231 Proxies `random` getter calls
233 In single locale mode, this will be proxied to the `random` getter
234 of the only internal `Generator` object. Subclasses will have to
235 implement desired behavior in multiple locale mode.
236 """
238 if len(self._factories) == 1:
239 return self._factories[0].random
240 else:
241 msg = "Proxying `random` getter calls is not implemented in multiple locale mode."
242 raise NotImplementedError(msg)
244 @random.setter
245 def random(self, value: Random) -> None:
246 """
247 Proxies `random` setter calls
249 In single locale mode, this will be proxied to the `random` setter
250 of the only internal `Generator` object. Subclasses will have to
251 implement desired behavior in multiple locale mode.
252 """
254 if len(self._factories) == 1:
255 self._factories[0].random = value
256 else:
257 msg = "Proxying `random` setter calls is not implemented in multiple locale mode."
258 raise NotImplementedError(msg)
260 @property
261 def locales(self) -> List[str]:
262 return list(self._locales)
264 @property
265 def weights(self) -> Optional[List[Union[int, float]]]:
266 return self._weights
268 @property
269 def factories(self) -> List[Generator]:
270 return self._factories
272 def items(self) -> List[Tuple[str, Generator]]:
273 return list(self._factory_map.items())
276class UniqueProxy:
277 def __init__(self, proxy: Faker):
278 self._proxy = proxy
279 self._seen: Dict = {}
280 self._sentinel = object()
282 def clear(self) -> None:
283 self._seen = {}
285 def __getattr__(self, name: str) -> Any:
286 obj = getattr(self._proxy, name)
287 if callable(obj):
288 return self._wrap(name, obj)
289 else:
290 raise TypeError("Accessing non-functions through .unique is not supported.")
292 def __getstate__(self):
293 # Copy the object's state from self.__dict__ which contains
294 # all our instance attributes. Always use the dict.copy()
295 # method to avoid modifying the original state.
296 state = self.__dict__.copy()
297 return state
299 def __setstate__(self, state):
300 self.__dict__.update(state)
302 def _wrap(self, name: str, function: Callable) -> Callable:
303 @functools.wraps(function)
304 def wrapper(*args, **kwargs):
305 key = (name, args, tuple(sorted(kwargs.items())))
307 generated = self._seen.setdefault(key, {self._sentinel})
309 # With use of a sentinel value rather than None, we leave
310 # None open as a valid return value.
311 retval = self._sentinel
313 for i in range(_UNIQUE_ATTEMPTS):
314 if retval not in generated:
315 break
316 retval = function(*args, **kwargs)
317 else:
318 raise UniquenessException(f"Got duplicated values after {_UNIQUE_ATTEMPTS:,} iterations.")
320 generated.add(retval)
322 return retval
324 return wrapper