Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/yaml/parser.py: 9%

352 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1 

2# The following YAML grammar is LL(1) and is parsed by a recursive descent 

3# parser. 

4# 

5# stream ::= STREAM-START implicit_document? explicit_document* STREAM-END 

6# implicit_document ::= block_node DOCUMENT-END* 

7# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* 

8# block_node_or_indentless_sequence ::= 

9# ALIAS 

10# | properties (block_content | indentless_block_sequence)? 

11# | block_content 

12# | indentless_block_sequence 

13# block_node ::= ALIAS 

14# | properties block_content? 

15# | block_content 

16# flow_node ::= ALIAS 

17# | properties flow_content? 

18# | flow_content 

19# properties ::= TAG ANCHOR? | ANCHOR TAG? 

20# block_content ::= block_collection | flow_collection | SCALAR 

21# flow_content ::= flow_collection | SCALAR 

22# block_collection ::= block_sequence | block_mapping 

23# flow_collection ::= flow_sequence | flow_mapping 

24# block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* BLOCK-END 

25# indentless_sequence ::= (BLOCK-ENTRY block_node?)+ 

26# block_mapping ::= BLOCK-MAPPING_START 

27# ((KEY block_node_or_indentless_sequence?)? 

28# (VALUE block_node_or_indentless_sequence?)?)* 

29# BLOCK-END 

30# flow_sequence ::= FLOW-SEQUENCE-START 

31# (flow_sequence_entry FLOW-ENTRY)* 

32# flow_sequence_entry? 

33# FLOW-SEQUENCE-END 

34# flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? 

35# flow_mapping ::= FLOW-MAPPING-START 

36# (flow_mapping_entry FLOW-ENTRY)* 

37# flow_mapping_entry? 

38# FLOW-MAPPING-END 

39# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? 

40# 

41# FIRST sets: 

42# 

43# stream: { STREAM-START } 

44# explicit_document: { DIRECTIVE DOCUMENT-START } 

45# implicit_document: FIRST(block_node) 

46# block_node: { ALIAS TAG ANCHOR SCALAR BLOCK-SEQUENCE-START BLOCK-MAPPING-START FLOW-SEQUENCE-START FLOW-MAPPING-START } 

47# flow_node: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START FLOW-MAPPING-START } 

48# block_content: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR } 

49# flow_content: { FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR } 

50# block_collection: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START } 

51# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START } 

52# block_sequence: { BLOCK-SEQUENCE-START } 

53# block_mapping: { BLOCK-MAPPING-START } 

54# block_node_or_indentless_sequence: { ALIAS ANCHOR TAG SCALAR BLOCK-SEQUENCE-START BLOCK-MAPPING-START FLOW-SEQUENCE-START FLOW-MAPPING-START BLOCK-ENTRY } 

55# indentless_sequence: { ENTRY } 

56# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START } 

57# flow_sequence: { FLOW-SEQUENCE-START } 

58# flow_mapping: { FLOW-MAPPING-START } 

59# flow_sequence_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START FLOW-MAPPING-START KEY } 

60# flow_mapping_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START FLOW-MAPPING-START KEY } 

61 

62__all__ = ['Parser', 'ParserError'] 

63 

64from .error import MarkedYAMLError 

65from .tokens import * 

66from .events import * 

67from .scanner import * 

68 

69class ParserError(MarkedYAMLError): 

70 pass 

71 

72class Parser: 

73 # Since writing a recursive-descendant parser is a straightforward task, we 

74 # do not give many comments here. 

75 

76 DEFAULT_TAGS = { 

77 '!': '!', 

78 '!!': 'tag:yaml.org,2002:', 

79 } 

80 

81 def __init__(self): 

82 self.current_event = None 

83 self.yaml_version = None 

84 self.tag_handles = {} 

85 self.states = [] 

86 self.marks = [] 

87 self.state = self.parse_stream_start 

88 

89 def dispose(self): 

90 # Reset the state attributes (to clear self-references) 

91 self.states = [] 

92 self.state = None 

93 

94 def check_event(self, *choices): 

95 # Check the type of the next event. 

96 if self.current_event is None: 

97 if self.state: 

98 self.current_event = self.state() 

99 if self.current_event is not None: 

100 if not choices: 

101 return True 

102 for choice in choices: 

103 if isinstance(self.current_event, choice): 

104 return True 

105 return False 

106 

107 def peek_event(self): 

108 # Get the next event. 

109 if self.current_event is None: 

110 if self.state: 

111 self.current_event = self.state() 

112 return self.current_event 

113 

114 def get_event(self): 

115 # Get the next event and proceed further. 

116 if self.current_event is None: 

117 if self.state: 

118 self.current_event = self.state() 

119 value = self.current_event 

120 self.current_event = None 

121 return value 

122 

123 # stream ::= STREAM-START implicit_document? explicit_document* STREAM-END 

124 # implicit_document ::= block_node DOCUMENT-END* 

125 # explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* 

126 

127 def parse_stream_start(self): 

128 

129 # Parse the stream start. 

130 token = self.get_token() 

131 event = StreamStartEvent(token.start_mark, token.end_mark, 

132 encoding=token.encoding) 

133 

134 # Prepare the next state. 

135 self.state = self.parse_implicit_document_start 

136 

137 return event 

138 

139 def parse_implicit_document_start(self): 

140 

141 # Parse an implicit document. 

142 if not self.check_token(DirectiveToken, DocumentStartToken, 

143 StreamEndToken): 

144 self.tag_handles = self.DEFAULT_TAGS 

145 token = self.peek_token() 

146 start_mark = end_mark = token.start_mark 

147 event = DocumentStartEvent(start_mark, end_mark, 

148 explicit=False) 

149 

150 # Prepare the next state. 

151 self.states.append(self.parse_document_end) 

152 self.state = self.parse_block_node 

153 

154 return event 

155 

156 else: 

157 return self.parse_document_start() 

158 

159 def parse_document_start(self): 

160 

161 # Parse any extra document end indicators. 

162 while self.check_token(DocumentEndToken): 

163 self.get_token() 

164 

165 # Parse an explicit document. 

166 if not self.check_token(StreamEndToken): 

167 token = self.peek_token() 

168 start_mark = token.start_mark 

169 version, tags = self.process_directives() 

170 if not self.check_token(DocumentStartToken): 

171 raise ParserError(None, None, 

172 "expected '<document start>', but found %r" 

173 % self.peek_token().id, 

174 self.peek_token().start_mark) 

175 token = self.get_token() 

176 end_mark = token.end_mark 

177 event = DocumentStartEvent(start_mark, end_mark, 

178 explicit=True, version=version, tags=tags) 

179 self.states.append(self.parse_document_end) 

180 self.state = self.parse_document_content 

181 else: 

182 # Parse the end of the stream. 

183 token = self.get_token() 

184 event = StreamEndEvent(token.start_mark, token.end_mark) 

185 assert not self.states 

186 assert not self.marks 

187 self.state = None 

188 return event 

189 

190 def parse_document_end(self): 

191 

192 # Parse the document end. 

193 token = self.peek_token() 

194 start_mark = end_mark = token.start_mark 

195 explicit = False 

196 if self.check_token(DocumentEndToken): 

197 token = self.get_token() 

198 end_mark = token.end_mark 

199 explicit = True 

200 event = DocumentEndEvent(start_mark, end_mark, 

201 explicit=explicit) 

202 

203 # Prepare the next state. 

204 self.state = self.parse_document_start 

205 

206 return event 

207 

208 def parse_document_content(self): 

209 if self.check_token(DirectiveToken, 

210 DocumentStartToken, DocumentEndToken, StreamEndToken): 

211 event = self.process_empty_scalar(self.peek_token().start_mark) 

212 self.state = self.states.pop() 

213 return event 

214 else: 

215 return self.parse_block_node() 

216 

217 def process_directives(self): 

218 self.yaml_version = None 

219 self.tag_handles = {} 

220 while self.check_token(DirectiveToken): 

221 token = self.get_token() 

222 if token.name == 'YAML': 

223 if self.yaml_version is not None: 

224 raise ParserError(None, None, 

225 "found duplicate YAML directive", token.start_mark) 

226 major, minor = token.value 

227 if major != 1: 

228 raise ParserError(None, None, 

229 "found incompatible YAML document (version 1.* is required)", 

230 token.start_mark) 

231 self.yaml_version = token.value 

232 elif token.name == 'TAG': 

233 handle, prefix = token.value 

234 if handle in self.tag_handles: 

235 raise ParserError(None, None, 

236 "duplicate tag handle %r" % handle, 

237 token.start_mark) 

238 self.tag_handles[handle] = prefix 

239 if self.tag_handles: 

240 value = self.yaml_version, self.tag_handles.copy() 

241 else: 

242 value = self.yaml_version, None 

243 for key in self.DEFAULT_TAGS: 

244 if key not in self.tag_handles: 

245 self.tag_handles[key] = self.DEFAULT_TAGS[key] 

246 return value 

247 

248 # block_node_or_indentless_sequence ::= ALIAS 

249 # | properties (block_content | indentless_block_sequence)? 

250 # | block_content 

251 # | indentless_block_sequence 

252 # block_node ::= ALIAS 

253 # | properties block_content? 

254 # | block_content 

255 # flow_node ::= ALIAS 

256 # | properties flow_content? 

257 # | flow_content 

258 # properties ::= TAG ANCHOR? | ANCHOR TAG? 

259 # block_content ::= block_collection | flow_collection | SCALAR 

260 # flow_content ::= flow_collection | SCALAR 

261 # block_collection ::= block_sequence | block_mapping 

262 # flow_collection ::= flow_sequence | flow_mapping 

263 

264 def parse_block_node(self): 

265 return self.parse_node(block=True) 

266 

267 def parse_flow_node(self): 

268 return self.parse_node() 

269 

270 def parse_block_node_or_indentless_sequence(self): 

271 return self.parse_node(block=True, indentless_sequence=True) 

272 

273 def parse_node(self, block=False, indentless_sequence=False): 

274 if self.check_token(AliasToken): 

275 token = self.get_token() 

276 event = AliasEvent(token.value, token.start_mark, token.end_mark) 

277 self.state = self.states.pop() 

278 else: 

279 anchor = None 

280 tag = None 

281 start_mark = end_mark = tag_mark = None 

282 if self.check_token(AnchorToken): 

283 token = self.get_token() 

284 start_mark = token.start_mark 

285 end_mark = token.end_mark 

286 anchor = token.value 

287 if self.check_token(TagToken): 

288 token = self.get_token() 

289 tag_mark = token.start_mark 

290 end_mark = token.end_mark 

291 tag = token.value 

292 elif self.check_token(TagToken): 

293 token = self.get_token() 

294 start_mark = tag_mark = token.start_mark 

295 end_mark = token.end_mark 

296 tag = token.value 

297 if self.check_token(AnchorToken): 

298 token = self.get_token() 

299 end_mark = token.end_mark 

300 anchor = token.value 

301 if tag is not None: 

302 handle, suffix = tag 

303 if handle is not None: 

304 if handle not in self.tag_handles: 

305 raise ParserError("while parsing a node", start_mark, 

306 "found undefined tag handle %r" % handle, 

307 tag_mark) 

308 tag = self.tag_handles[handle]+suffix 

309 else: 

310 tag = suffix 

311 #if tag == '!': 

312 # raise ParserError("while parsing a node", start_mark, 

313 # "found non-specific tag '!'", tag_mark, 

314 # "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag' and share your opinion.") 

315 if start_mark is None: 

316 start_mark = end_mark = self.peek_token().start_mark 

317 event = None 

318 implicit = (tag is None or tag == '!') 

319 if indentless_sequence and self.check_token(BlockEntryToken): 

320 end_mark = self.peek_token().end_mark 

321 event = SequenceStartEvent(anchor, tag, implicit, 

322 start_mark, end_mark) 

323 self.state = self.parse_indentless_sequence_entry 

324 else: 

325 if self.check_token(ScalarToken): 

326 token = self.get_token() 

327 end_mark = token.end_mark 

328 if (token.plain and tag is None) or tag == '!': 

329 implicit = (True, False) 

330 elif tag is None: 

331 implicit = (False, True) 

332 else: 

333 implicit = (False, False) 

334 event = ScalarEvent(anchor, tag, implicit, token.value, 

335 start_mark, end_mark, style=token.style) 

336 self.state = self.states.pop() 

337 elif self.check_token(FlowSequenceStartToken): 

338 end_mark = self.peek_token().end_mark 

339 event = SequenceStartEvent(anchor, tag, implicit, 

340 start_mark, end_mark, flow_style=True) 

341 self.state = self.parse_flow_sequence_first_entry 

342 elif self.check_token(FlowMappingStartToken): 

343 end_mark = self.peek_token().end_mark 

344 event = MappingStartEvent(anchor, tag, implicit, 

345 start_mark, end_mark, flow_style=True) 

346 self.state = self.parse_flow_mapping_first_key 

347 elif block and self.check_token(BlockSequenceStartToken): 

348 end_mark = self.peek_token().start_mark 

349 event = SequenceStartEvent(anchor, tag, implicit, 

350 start_mark, end_mark, flow_style=False) 

351 self.state = self.parse_block_sequence_first_entry 

352 elif block and self.check_token(BlockMappingStartToken): 

353 end_mark = self.peek_token().start_mark 

354 event = MappingStartEvent(anchor, tag, implicit, 

355 start_mark, end_mark, flow_style=False) 

356 self.state = self.parse_block_mapping_first_key 

357 elif anchor is not None or tag is not None: 

358 # Empty scalars are allowed even if a tag or an anchor is 

359 # specified. 

360 event = ScalarEvent(anchor, tag, (implicit, False), '', 

361 start_mark, end_mark) 

362 self.state = self.states.pop() 

363 else: 

364 if block: 

365 node = 'block' 

366 else: 

367 node = 'flow' 

368 token = self.peek_token() 

369 raise ParserError("while parsing a %s node" % node, start_mark, 

370 "expected the node content, but found %r" % token.id, 

371 token.start_mark) 

372 return event 

373 

374 # block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* BLOCK-END 

375 

376 def parse_block_sequence_first_entry(self): 

377 token = self.get_token() 

378 self.marks.append(token.start_mark) 

379 return self.parse_block_sequence_entry() 

380 

381 def parse_block_sequence_entry(self): 

382 if self.check_token(BlockEntryToken): 

383 token = self.get_token() 

384 if not self.check_token(BlockEntryToken, BlockEndToken): 

385 self.states.append(self.parse_block_sequence_entry) 

386 return self.parse_block_node() 

387 else: 

388 self.state = self.parse_block_sequence_entry 

389 return self.process_empty_scalar(token.end_mark) 

390 if not self.check_token(BlockEndToken): 

391 token = self.peek_token() 

392 raise ParserError("while parsing a block collection", self.marks[-1], 

393 "expected <block end>, but found %r" % token.id, token.start_mark) 

394 token = self.get_token() 

395 event = SequenceEndEvent(token.start_mark, token.end_mark) 

396 self.state = self.states.pop() 

397 self.marks.pop() 

398 return event 

399 

400 # indentless_sequence ::= (BLOCK-ENTRY block_node?)+ 

401 

402 def parse_indentless_sequence_entry(self): 

403 if self.check_token(BlockEntryToken): 

404 token = self.get_token() 

405 if not self.check_token(BlockEntryToken, 

406 KeyToken, ValueToken, BlockEndToken): 

407 self.states.append(self.parse_indentless_sequence_entry) 

408 return self.parse_block_node() 

409 else: 

410 self.state = self.parse_indentless_sequence_entry 

411 return self.process_empty_scalar(token.end_mark) 

412 token = self.peek_token() 

413 event = SequenceEndEvent(token.start_mark, token.start_mark) 

414 self.state = self.states.pop() 

415 return event 

416 

417 # block_mapping ::= BLOCK-MAPPING_START 

418 # ((KEY block_node_or_indentless_sequence?)? 

419 # (VALUE block_node_or_indentless_sequence?)?)* 

420 # BLOCK-END 

421 

422 def parse_block_mapping_first_key(self): 

423 token = self.get_token() 

424 self.marks.append(token.start_mark) 

425 return self.parse_block_mapping_key() 

426 

427 def parse_block_mapping_key(self): 

428 if self.check_token(KeyToken): 

429 token = self.get_token() 

430 if not self.check_token(KeyToken, ValueToken, BlockEndToken): 

431 self.states.append(self.parse_block_mapping_value) 

432 return self.parse_block_node_or_indentless_sequence() 

433 else: 

434 self.state = self.parse_block_mapping_value 

435 return self.process_empty_scalar(token.end_mark) 

436 if not self.check_token(BlockEndToken): 

437 token = self.peek_token() 

438 raise ParserError("while parsing a block mapping", self.marks[-1], 

439 "expected <block end>, but found %r" % token.id, token.start_mark) 

440 token = self.get_token() 

441 event = MappingEndEvent(token.start_mark, token.end_mark) 

442 self.state = self.states.pop() 

443 self.marks.pop() 

444 return event 

445 

446 def parse_block_mapping_value(self): 

447 if self.check_token(ValueToken): 

448 token = self.get_token() 

449 if not self.check_token(KeyToken, ValueToken, BlockEndToken): 

450 self.states.append(self.parse_block_mapping_key) 

451 return self.parse_block_node_or_indentless_sequence() 

452 else: 

453 self.state = self.parse_block_mapping_key 

454 return self.process_empty_scalar(token.end_mark) 

455 else: 

456 self.state = self.parse_block_mapping_key 

457 token = self.peek_token() 

458 return self.process_empty_scalar(token.start_mark) 

459 

460 # flow_sequence ::= FLOW-SEQUENCE-START 

461 # (flow_sequence_entry FLOW-ENTRY)* 

462 # flow_sequence_entry? 

463 # FLOW-SEQUENCE-END 

464 # flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? 

465 # 

466 # Note that while production rules for both flow_sequence_entry and 

467 # flow_mapping_entry are equal, their interpretations are different. 

468 # For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?` 

469 # generate an inline mapping (set syntax). 

470 

471 def parse_flow_sequence_first_entry(self): 

472 token = self.get_token() 

473 self.marks.append(token.start_mark) 

474 return self.parse_flow_sequence_entry(first=True) 

475 

476 def parse_flow_sequence_entry(self, first=False): 

477 if not self.check_token(FlowSequenceEndToken): 

478 if not first: 

479 if self.check_token(FlowEntryToken): 

480 self.get_token() 

481 else: 

482 token = self.peek_token() 

483 raise ParserError("while parsing a flow sequence", self.marks[-1], 

484 "expected ',' or ']', but got %r" % token.id, token.start_mark) 

485 

486 if self.check_token(KeyToken): 

487 token = self.peek_token() 

488 event = MappingStartEvent(None, None, True, 

489 token.start_mark, token.end_mark, 

490 flow_style=True) 

491 self.state = self.parse_flow_sequence_entry_mapping_key 

492 return event 

493 elif not self.check_token(FlowSequenceEndToken): 

494 self.states.append(self.parse_flow_sequence_entry) 

495 return self.parse_flow_node() 

496 token = self.get_token() 

497 event = SequenceEndEvent(token.start_mark, token.end_mark) 

498 self.state = self.states.pop() 

499 self.marks.pop() 

500 return event 

501 

502 def parse_flow_sequence_entry_mapping_key(self): 

503 token = self.get_token() 

504 if not self.check_token(ValueToken, 

505 FlowEntryToken, FlowSequenceEndToken): 

506 self.states.append(self.parse_flow_sequence_entry_mapping_value) 

507 return self.parse_flow_node() 

508 else: 

509 self.state = self.parse_flow_sequence_entry_mapping_value 

510 return self.process_empty_scalar(token.end_mark) 

511 

512 def parse_flow_sequence_entry_mapping_value(self): 

513 if self.check_token(ValueToken): 

514 token = self.get_token() 

515 if not self.check_token(FlowEntryToken, FlowSequenceEndToken): 

516 self.states.append(self.parse_flow_sequence_entry_mapping_end) 

517 return self.parse_flow_node() 

518 else: 

519 self.state = self.parse_flow_sequence_entry_mapping_end 

520 return self.process_empty_scalar(token.end_mark) 

521 else: 

522 self.state = self.parse_flow_sequence_entry_mapping_end 

523 token = self.peek_token() 

524 return self.process_empty_scalar(token.start_mark) 

525 

526 def parse_flow_sequence_entry_mapping_end(self): 

527 self.state = self.parse_flow_sequence_entry 

528 token = self.peek_token() 

529 return MappingEndEvent(token.start_mark, token.start_mark) 

530 

531 # flow_mapping ::= FLOW-MAPPING-START 

532 # (flow_mapping_entry FLOW-ENTRY)* 

533 # flow_mapping_entry? 

534 # FLOW-MAPPING-END 

535 # flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? 

536 

537 def parse_flow_mapping_first_key(self): 

538 token = self.get_token() 

539 self.marks.append(token.start_mark) 

540 return self.parse_flow_mapping_key(first=True) 

541 

542 def parse_flow_mapping_key(self, first=False): 

543 if not self.check_token(FlowMappingEndToken): 

544 if not first: 

545 if self.check_token(FlowEntryToken): 

546 self.get_token() 

547 else: 

548 token = self.peek_token() 

549 raise ParserError("while parsing a flow mapping", self.marks[-1], 

550 "expected ',' or '}', but got %r" % token.id, token.start_mark) 

551 if self.check_token(KeyToken): 

552 token = self.get_token() 

553 if not self.check_token(ValueToken, 

554 FlowEntryToken, FlowMappingEndToken): 

555 self.states.append(self.parse_flow_mapping_value) 

556 return self.parse_flow_node() 

557 else: 

558 self.state = self.parse_flow_mapping_value 

559 return self.process_empty_scalar(token.end_mark) 

560 elif not self.check_token(FlowMappingEndToken): 

561 self.states.append(self.parse_flow_mapping_empty_value) 

562 return self.parse_flow_node() 

563 token = self.get_token() 

564 event = MappingEndEvent(token.start_mark, token.end_mark) 

565 self.state = self.states.pop() 

566 self.marks.pop() 

567 return event 

568 

569 def parse_flow_mapping_value(self): 

570 if self.check_token(ValueToken): 

571 token = self.get_token() 

572 if not self.check_token(FlowEntryToken, FlowMappingEndToken): 

573 self.states.append(self.parse_flow_mapping_key) 

574 return self.parse_flow_node() 

575 else: 

576 self.state = self.parse_flow_mapping_key 

577 return self.process_empty_scalar(token.end_mark) 

578 else: 

579 self.state = self.parse_flow_mapping_key 

580 token = self.peek_token() 

581 return self.process_empty_scalar(token.start_mark) 

582 

583 def parse_flow_mapping_empty_value(self): 

584 self.state = self.parse_flow_mapping_key 

585 return self.process_empty_scalar(self.peek_token().start_mark) 

586 

587 def process_empty_scalar(self, mark): 

588 return ScalarEvent(None, None, (True, False), '', mark, mark) 

589