Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/factory/builder.py: 81%
156 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""Build factory instances."""
3import collections
5from . import enums, errors, utils
7DeclarationWithContext = collections.namedtuple(
8 'DeclarationWithContext',
9 ['name', 'declaration', 'context'],
10)
13class DeclarationSet:
14 """A set of declarations, including the recursive parameters.
16 Attributes:
17 declarations (dict(name => declaration)): the top-level declarations
18 contexts (dict(name => dict(subfield => value))): the nested parameters related
19 to a given top-level declaration
21 This object behaves similarly to a dict mapping a top-level declaration name to a
22 DeclarationWithContext, containing field name, declaration object and extra context.
23 """
25 def __init__(self, initial=None):
26 self.declarations = {}
27 self.contexts = collections.defaultdict(dict)
28 self.update(initial or {})
30 @classmethod
31 def split(cls, entry):
32 """Split a declaration name into a (declaration, subpath) tuple.
34 Examples:
35 >>> DeclarationSet.split('foo__bar')
36 ('foo', 'bar')
37 >>> DeclarationSet.split('foo')
38 ('foo', None)
39 >>> DeclarationSet.split('foo__bar__baz')
40 ('foo', 'bar__baz')
41 """
42 if enums.SPLITTER in entry: 42 ↛ 43line 42 didn't jump to line 43, because the condition on line 42 was never true
43 return entry.split(enums.SPLITTER, 1)
44 else:
45 return (entry, None)
47 @classmethod
48 def join(cls, root, subkey):
49 """Rebuild a full declaration name from its components.
51 for every string x, we have `join(split(x)) == x`.
52 """
53 if subkey is None:
54 return root
55 return enums.SPLITTER.join((root, subkey))
57 def copy(self):
58 return self.__class__(self.as_dict())
60 def update(self, values):
61 """Add new declarations to this set/
63 Args:
64 values (dict(name, declaration)): the declarations to ingest.
65 """
66 for k, v in values.items():
67 root, sub = self.split(k)
68 if sub is None: 68 ↛ 71line 68 didn't jump to line 71, because the condition on line 68 was never false
69 self.declarations[root] = v
70 else:
71 self.contexts[root][sub] = v
73 extra_context_keys = set(self.contexts) - set(self.declarations)
74 if extra_context_keys: 74 ↛ 75line 74 didn't jump to line 75, because the condition on line 74 was never true
75 raise errors.InvalidDeclarationError(
76 "Received deep context for unknown fields: %r (known=%r)" % (
77 {
78 self.join(root, sub): v
79 for root in extra_context_keys
80 for sub, v in self.contexts[root].items()
81 },
82 sorted(self.declarations),
83 )
84 )
86 def filter(self, entries):
87 """Filter a set of declarations: keep only those related to this object.
89 This will keep:
90 - Declarations that 'override' the current ones
91 - Declarations that are parameters to current ones
92 """
93 return [
94 entry for entry in entries
95 if self.split(entry)[0] in self.declarations
96 ]
98 def sorted(self):
99 return utils.sort_ordered_objects(
100 self.declarations,
101 getter=lambda entry: self.declarations[entry],
102 )
104 def __contains__(self, key):
105 return key in self.declarations
107 def __getitem__(self, key):
108 return DeclarationWithContext(
109 name=key,
110 declaration=self.declarations[key],
111 context=self.contexts[key],
112 )
114 def __iter__(self):
115 return iter(self.declarations)
117 def values(self):
118 """Retrieve the list of declarations, with their context."""
119 for name in self:
120 yield self[name]
122 def _items(self):
123 """Extract a list of (key, value) pairs, suitable for our __init__."""
124 for name in self.declarations:
125 yield name, self.declarations[name]
126 for subkey, value in self.contexts[name].items(): 126 ↛ 127line 126 didn't jump to line 127, because the loop on line 126 never started
127 yield self.join(name, subkey), value
129 def as_dict(self):
130 """Return a dict() suitable for our __init__."""
131 return dict(self._items())
133 def __repr__(self):
134 return '<DeclarationSet: %r>' % self.as_dict()
137def parse_declarations(decls, base_pre=None, base_post=None):
138 pre_declarations = base_pre.copy() if base_pre else DeclarationSet()
139 post_declarations = base_post.copy() if base_post else DeclarationSet()
141 # Inject extra declarations, splitting between known-to-be-post and undetermined
142 extra_post = {}
143 extra_maybenonpost = {}
144 for k, v in decls.items():
145 if enums.get_builder_phase(v) == enums.BuilderPhase.POST_INSTANTIATION:
146 if k in pre_declarations: 146 ↛ 149line 146 didn't jump to line 149, because the condition on line 146 was never true
147 # Conflict: PostGenerationDeclaration with the same
148 # name as a BaseDeclaration
149 raise errors.InvalidDeclarationError(
150 "PostGenerationDeclaration %s=%r shadows declaration %r"
151 % (k, v, pre_declarations[k])
152 )
153 extra_post[k] = v
154 elif k in post_declarations: 154 ↛ 157line 154 didn't jump to line 157, because the condition on line 154 was never true
155 # Passing in a scalar value to a PostGenerationDeclaration
156 # Set it as `key__`
157 magic_key = post_declarations.join(k, '')
158 extra_post[magic_key] = v
159 else:
160 extra_maybenonpost[k] = v
162 # Start with adding new post-declarations
163 post_declarations.update(extra_post)
165 # Fill in extra post-declaration context
166 post_overrides = post_declarations.filter(extra_maybenonpost)
167 post_declarations.update({
168 k: v
169 for k, v in extra_maybenonpost.items()
170 if k in post_overrides
171 })
173 # Anything else is pre_declarations
174 pre_declarations.update({
175 k: v
176 for k, v in extra_maybenonpost.items()
177 if k not in post_overrides
178 })
180 return pre_declarations, post_declarations
183class BuildStep:
184 def __init__(self, builder, sequence, parent_step=None):
185 self.builder = builder
186 self.sequence = sequence
187 self.attributes = {}
188 self.parent_step = parent_step
189 self.stub = None
191 def resolve(self, declarations):
192 self.stub = Resolver(
193 declarations=declarations,
194 step=self,
195 sequence=self.sequence,
196 )
198 for field_name in declarations:
199 self.attributes[field_name] = getattr(self.stub, field_name)
201 @property
202 def chain(self):
203 if self.parent_step:
204 parent_chain = self.parent_step.chain
205 else:
206 parent_chain = ()
207 return (self.stub,) + parent_chain
209 def recurse(self, factory, declarations, force_sequence=None):
210 from . import base
211 if not issubclass(factory, base.BaseFactory): 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true
212 raise errors.AssociatedClassError(
213 "%r: Attempting to recursing into a non-factory object %r"
214 % (self, factory))
215 builder = self.builder.recurse(factory._meta, declarations)
216 return builder.build(parent_step=self, force_sequence=force_sequence)
218 def __repr__(self):
219 return f"<BuildStep for {self.builder!r}>"
222class StepBuilder:
223 """A factory instantiation step.
225 Attributes:
226 - parent: the parent StepBuilder, or None for the root step
227 - extras: the passed-in kwargs for this branch
228 - factory: the factory class being built
229 - strategy: the strategy to use
230 """
231 def __init__(self, factory_meta, extras, strategy):
232 self.factory_meta = factory_meta
233 self.strategy = strategy
234 self.extras = extras
235 self.force_init_sequence = extras.pop('__sequence', None)
237 def build(self, parent_step=None, force_sequence=None):
238 """Build a factory instance."""
239 # TODO: Handle "batch build" natively
240 pre, post = parse_declarations(
241 self.extras,
242 base_pre=self.factory_meta.pre_declarations,
243 base_post=self.factory_meta.post_declarations,
244 )
246 if force_sequence is not None: 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true
247 sequence = force_sequence
248 elif self.force_init_sequence is not None: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true
249 sequence = self.force_init_sequence
250 else:
251 sequence = self.factory_meta.next_sequence()
253 step = BuildStep(
254 builder=self,
255 sequence=sequence,
256 parent_step=parent_step,
257 )
258 step.resolve(pre)
260 args, kwargs = self.factory_meta.prepare_arguments(step.attributes)
262 instance = self.factory_meta.instantiate(
263 step=step,
264 args=args,
265 kwargs=kwargs,
266 )
268 postgen_results = {}
269 for declaration_name in post.sorted():
270 declaration = post[declaration_name]
271 postgen_results[declaration_name] = declaration.declaration.evaluate_post(
272 instance=instance,
273 step=step,
274 overrides=declaration.context,
275 )
276 self.factory_meta.use_postgeneration_results(
277 instance=instance,
278 step=step,
279 results=postgen_results,
280 )
281 return instance
283 def recurse(self, factory_meta, extras):
284 """Recurse into a sub-factory call."""
285 return self.__class__(factory_meta, extras, strategy=self.strategy)
287 def __repr__(self):
288 return f"<StepBuilder({self.factory_meta!r}, strategy={self.strategy!r})>"
291class Resolver:
292 """Resolve a set of declarations.
294 Attributes are set at instantiation time, values are computed lazily.
296 Attributes:
297 __initialized (bool): whether this object's __init__ as run. If set,
298 setting any attribute will be prevented.
299 __declarations (dict): maps attribute name to their declaration
300 __values (dict): maps attribute name to computed value
301 __pending (str list): names of the attributes whose value is being
302 computed. This allows to detect cyclic lazy attribute definition.
303 __step (BuildStep): the BuildStep related to this resolver.
304 This allows to have the value of a field depend on the value of
305 another field
306 """
308 __initialized = False
310 def __init__(self, declarations, step, sequence):
311 self.__declarations = declarations
312 self.__step = step
314 self.__values = {}
315 self.__pending = []
317 self.__initialized = True
319 @property
320 def factory_parent(self):
321 return self.__step.parent_step.stub if self.__step.parent_step else None
323 def __repr__(self):
324 return '<Resolver for %r>' % self.__step
326 def __getattr__(self, name):
327 """Retrieve an attribute's value.
329 This will compute it if needed, unless it is already on the list of
330 attributes being computed.
331 """
332 if name in self.__pending: 332 ↛ 333line 332 didn't jump to line 333, because the condition on line 332 was never true
333 raise errors.CyclicDefinitionError(
334 "Cyclic lazy attribute definition for %r; cycle found in %r." %
335 (name, self.__pending))
336 elif name in self.__values:
337 return self.__values[name]
338 elif name in self.__declarations: 338 ↛ 356line 338 didn't jump to line 356, because the condition on line 338 was never false
339 declaration = self.__declarations[name]
340 value = declaration.declaration
341 if enums.get_builder_phase(value) == enums.BuilderPhase.ATTRIBUTE_RESOLUTION:
342 self.__pending.append(name)
343 try:
344 value = value.evaluate_pre(
345 instance=self,
346 step=self.__step,
347 overrides=declaration.context,
348 )
349 finally:
350 last = self.__pending.pop()
351 assert name == last
353 self.__values[name] = value
354 return value
355 else:
356 raise AttributeError(
357 "The parameter %r is unknown. Evaluated attributes are %r, "
358 "definitions are %r." % (name, self.__values, self.__declarations))
360 def __setattr__(self, name, value):
361 """Prevent setting attributes once __init__ is done."""
362 if not self.__initialized: 362 ↛ 365line 362 didn't jump to line 365, because the condition on line 362 was never false
363 return super().__setattr__(name, value)
364 else:
365 raise AttributeError('Setting of object attributes is not allowed')