Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/factory/builder.py: 81%

156 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Build factory instances.""" 

2 

3import collections 

4 

5from . import enums, errors, utils 

6 

7DeclarationWithContext = collections.namedtuple( 

8 'DeclarationWithContext', 

9 ['name', 'declaration', 'context'], 

10) 

11 

12 

13class DeclarationSet: 

14 """A set of declarations, including the recursive parameters. 

15 

16 Attributes: 

17 declarations (dict(name => declaration)): the top-level declarations 

18 contexts (dict(name => dict(subfield => value))): the nested parameters related 

19 to a given top-level declaration 

20 

21 This object behaves similarly to a dict mapping a top-level declaration name to a 

22 DeclarationWithContext, containing field name, declaration object and extra context. 

23 """ 

24 

25 def __init__(self, initial=None): 

26 self.declarations = {} 

27 self.contexts = collections.defaultdict(dict) 

28 self.update(initial or {}) 

29 

30 @classmethod 

31 def split(cls, entry): 

32 """Split a declaration name into a (declaration, subpath) tuple. 

33 

34 Examples: 

35 >>> DeclarationSet.split('foo__bar') 

36 ('foo', 'bar') 

37 >>> DeclarationSet.split('foo') 

38 ('foo', None) 

39 >>> DeclarationSet.split('foo__bar__baz') 

40 ('foo', 'bar__baz') 

41 """ 

42 if enums.SPLITTER in entry: 42 ↛ 43line 42 didn't jump to line 43, because the condition on line 42 was never true

43 return entry.split(enums.SPLITTER, 1) 

44 else: 

45 return (entry, None) 

46 

47 @classmethod 

48 def join(cls, root, subkey): 

49 """Rebuild a full declaration name from its components. 

50 

51 for every string x, we have `join(split(x)) == x`. 

52 """ 

53 if subkey is None: 

54 return root 

55 return enums.SPLITTER.join((root, subkey)) 

56 

57 def copy(self): 

58 return self.__class__(self.as_dict()) 

59 

60 def update(self, values): 

61 """Add new declarations to this set/ 

62 

63 Args: 

64 values (dict(name, declaration)): the declarations to ingest. 

65 """ 

66 for k, v in values.items(): 

67 root, sub = self.split(k) 

68 if sub is None: 68 ↛ 71line 68 didn't jump to line 71, because the condition on line 68 was never false

69 self.declarations[root] = v 

70 else: 

71 self.contexts[root][sub] = v 

72 

73 extra_context_keys = set(self.contexts) - set(self.declarations) 

74 if extra_context_keys: 74 ↛ 75line 74 didn't jump to line 75, because the condition on line 74 was never true

75 raise errors.InvalidDeclarationError( 

76 "Received deep context for unknown fields: %r (known=%r)" % ( 

77 { 

78 self.join(root, sub): v 

79 for root in extra_context_keys 

80 for sub, v in self.contexts[root].items() 

81 }, 

82 sorted(self.declarations), 

83 ) 

84 ) 

85 

86 def filter(self, entries): 

87 """Filter a set of declarations: keep only those related to this object. 

88 

89 This will keep: 

90 - Declarations that 'override' the current ones 

91 - Declarations that are parameters to current ones 

92 """ 

93 return [ 

94 entry for entry in entries 

95 if self.split(entry)[0] in self.declarations 

96 ] 

97 

98 def sorted(self): 

99 return utils.sort_ordered_objects( 

100 self.declarations, 

101 getter=lambda entry: self.declarations[entry], 

102 ) 

103 

104 def __contains__(self, key): 

105 return key in self.declarations 

106 

107 def __getitem__(self, key): 

108 return DeclarationWithContext( 

109 name=key, 

110 declaration=self.declarations[key], 

111 context=self.contexts[key], 

112 ) 

113 

114 def __iter__(self): 

115 return iter(self.declarations) 

116 

117 def values(self): 

118 """Retrieve the list of declarations, with their context.""" 

119 for name in self: 

120 yield self[name] 

121 

122 def _items(self): 

123 """Extract a list of (key, value) pairs, suitable for our __init__.""" 

124 for name in self.declarations: 

125 yield name, self.declarations[name] 

126 for subkey, value in self.contexts[name].items(): 126 ↛ 127line 126 didn't jump to line 127, because the loop on line 126 never started

127 yield self.join(name, subkey), value 

128 

129 def as_dict(self): 

130 """Return a dict() suitable for our __init__.""" 

131 return dict(self._items()) 

132 

133 def __repr__(self): 

134 return '<DeclarationSet: %r>' % self.as_dict() 

135 

136 

137def parse_declarations(decls, base_pre=None, base_post=None): 

138 pre_declarations = base_pre.copy() if base_pre else DeclarationSet() 

139 post_declarations = base_post.copy() if base_post else DeclarationSet() 

140 

141 # Inject extra declarations, splitting between known-to-be-post and undetermined 

142 extra_post = {} 

143 extra_maybenonpost = {} 

144 for k, v in decls.items(): 

145 if enums.get_builder_phase(v) == enums.BuilderPhase.POST_INSTANTIATION: 

146 if k in pre_declarations: 146 ↛ 149line 146 didn't jump to line 149, because the condition on line 146 was never true

147 # Conflict: PostGenerationDeclaration with the same 

148 # name as a BaseDeclaration 

149 raise errors.InvalidDeclarationError( 

150 "PostGenerationDeclaration %s=%r shadows declaration %r" 

151 % (k, v, pre_declarations[k]) 

152 ) 

153 extra_post[k] = v 

154 elif k in post_declarations: 154 ↛ 157line 154 didn't jump to line 157, because the condition on line 154 was never true

155 # Passing in a scalar value to a PostGenerationDeclaration 

156 # Set it as `key__` 

157 magic_key = post_declarations.join(k, '') 

158 extra_post[magic_key] = v 

159 else: 

160 extra_maybenonpost[k] = v 

161 

162 # Start with adding new post-declarations 

163 post_declarations.update(extra_post) 

164 

165 # Fill in extra post-declaration context 

166 post_overrides = post_declarations.filter(extra_maybenonpost) 

167 post_declarations.update({ 

168 k: v 

169 for k, v in extra_maybenonpost.items() 

170 if k in post_overrides 

171 }) 

172 

173 # Anything else is pre_declarations 

174 pre_declarations.update({ 

175 k: v 

176 for k, v in extra_maybenonpost.items() 

177 if k not in post_overrides 

178 }) 

179 

180 return pre_declarations, post_declarations 

181 

182 

183class BuildStep: 

184 def __init__(self, builder, sequence, parent_step=None): 

185 self.builder = builder 

186 self.sequence = sequence 

187 self.attributes = {} 

188 self.parent_step = parent_step 

189 self.stub = None 

190 

191 def resolve(self, declarations): 

192 self.stub = Resolver( 

193 declarations=declarations, 

194 step=self, 

195 sequence=self.sequence, 

196 ) 

197 

198 for field_name in declarations: 

199 self.attributes[field_name] = getattr(self.stub, field_name) 

200 

201 @property 

202 def chain(self): 

203 if self.parent_step: 

204 parent_chain = self.parent_step.chain 

205 else: 

206 parent_chain = () 

207 return (self.stub,) + parent_chain 

208 

209 def recurse(self, factory, declarations, force_sequence=None): 

210 from . import base 

211 if not issubclass(factory, base.BaseFactory): 211 ↛ 212line 211 didn't jump to line 212, because the condition on line 211 was never true

212 raise errors.AssociatedClassError( 

213 "%r: Attempting to recursing into a non-factory object %r" 

214 % (self, factory)) 

215 builder = self.builder.recurse(factory._meta, declarations) 

216 return builder.build(parent_step=self, force_sequence=force_sequence) 

217 

218 def __repr__(self): 

219 return f"<BuildStep for {self.builder!r}>" 

220 

221 

222class StepBuilder: 

223 """A factory instantiation step. 

224 

225 Attributes: 

226 - parent: the parent StepBuilder, or None for the root step 

227 - extras: the passed-in kwargs for this branch 

228 - factory: the factory class being built 

229 - strategy: the strategy to use 

230 """ 

231 def __init__(self, factory_meta, extras, strategy): 

232 self.factory_meta = factory_meta 

233 self.strategy = strategy 

234 self.extras = extras 

235 self.force_init_sequence = extras.pop('__sequence', None) 

236 

237 def build(self, parent_step=None, force_sequence=None): 

238 """Build a factory instance.""" 

239 # TODO: Handle "batch build" natively 

240 pre, post = parse_declarations( 

241 self.extras, 

242 base_pre=self.factory_meta.pre_declarations, 

243 base_post=self.factory_meta.post_declarations, 

244 ) 

245 

246 if force_sequence is not None: 246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true

247 sequence = force_sequence 

248 elif self.force_init_sequence is not None: 248 ↛ 249line 248 didn't jump to line 249, because the condition on line 248 was never true

249 sequence = self.force_init_sequence 

250 else: 

251 sequence = self.factory_meta.next_sequence() 

252 

253 step = BuildStep( 

254 builder=self, 

255 sequence=sequence, 

256 parent_step=parent_step, 

257 ) 

258 step.resolve(pre) 

259 

260 args, kwargs = self.factory_meta.prepare_arguments(step.attributes) 

261 

262 instance = self.factory_meta.instantiate( 

263 step=step, 

264 args=args, 

265 kwargs=kwargs, 

266 ) 

267 

268 postgen_results = {} 

269 for declaration_name in post.sorted(): 

270 declaration = post[declaration_name] 

271 postgen_results[declaration_name] = declaration.declaration.evaluate_post( 

272 instance=instance, 

273 step=step, 

274 overrides=declaration.context, 

275 ) 

276 self.factory_meta.use_postgeneration_results( 

277 instance=instance, 

278 step=step, 

279 results=postgen_results, 

280 ) 

281 return instance 

282 

283 def recurse(self, factory_meta, extras): 

284 """Recurse into a sub-factory call.""" 

285 return self.__class__(factory_meta, extras, strategy=self.strategy) 

286 

287 def __repr__(self): 

288 return f"<StepBuilder({self.factory_meta!r}, strategy={self.strategy!r})>" 

289 

290 

291class Resolver: 

292 """Resolve a set of declarations. 

293 

294 Attributes are set at instantiation time, values are computed lazily. 

295 

296 Attributes: 

297 __initialized (bool): whether this object's __init__ as run. If set, 

298 setting any attribute will be prevented. 

299 __declarations (dict): maps attribute name to their declaration 

300 __values (dict): maps attribute name to computed value 

301 __pending (str list): names of the attributes whose value is being 

302 computed. This allows to detect cyclic lazy attribute definition. 

303 __step (BuildStep): the BuildStep related to this resolver. 

304 This allows to have the value of a field depend on the value of 

305 another field 

306 """ 

307 

308 __initialized = False 

309 

310 def __init__(self, declarations, step, sequence): 

311 self.__declarations = declarations 

312 self.__step = step 

313 

314 self.__values = {} 

315 self.__pending = [] 

316 

317 self.__initialized = True 

318 

319 @property 

320 def factory_parent(self): 

321 return self.__step.parent_step.stub if self.__step.parent_step else None 

322 

323 def __repr__(self): 

324 return '<Resolver for %r>' % self.__step 

325 

326 def __getattr__(self, name): 

327 """Retrieve an attribute's value. 

328 

329 This will compute it if needed, unless it is already on the list of 

330 attributes being computed. 

331 """ 

332 if name in self.__pending: 332 ↛ 333line 332 didn't jump to line 333, because the condition on line 332 was never true

333 raise errors.CyclicDefinitionError( 

334 "Cyclic lazy attribute definition for %r; cycle found in %r." % 

335 (name, self.__pending)) 

336 elif name in self.__values: 

337 return self.__values[name] 

338 elif name in self.__declarations: 338 ↛ 356line 338 didn't jump to line 356, because the condition on line 338 was never false

339 declaration = self.__declarations[name] 

340 value = declaration.declaration 

341 if enums.get_builder_phase(value) == enums.BuilderPhase.ATTRIBUTE_RESOLUTION: 

342 self.__pending.append(name) 

343 try: 

344 value = value.evaluate_pre( 

345 instance=self, 

346 step=self.__step, 

347 overrides=declaration.context, 

348 ) 

349 finally: 

350 last = self.__pending.pop() 

351 assert name == last 

352 

353 self.__values[name] = value 

354 return value 

355 else: 

356 raise AttributeError( 

357 "The parameter %r is unknown. Evaluated attributes are %r, " 

358 "definitions are %r." % (name, self.__values, self.__declarations)) 

359 

360 def __setattr__(self, name, value): 

361 """Prevent setting attributes once __init__ is done.""" 

362 if not self.__initialized: 362 ↛ 365line 362 didn't jump to line 365, because the condition on line 362 was never false

363 return super().__setattr__(name, value) 

364 else: 

365 raise AttributeError('Setting of object attributes is not allowed')