Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/PIL/PdfParser.py: 16%

630 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import calendar 

2import codecs 

3import collections 

4import mmap 

5import os 

6import re 

7import time 

8import zlib 

9 

10 

11# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set 

12# on page 656 

13def encode_text(s): 

14 return codecs.BOM_UTF16_BE + s.encode("utf_16_be") 

15 

16 

17PDFDocEncoding = { 

18 0x16: "\u0017", 

19 0x18: "\u02D8", 

20 0x19: "\u02C7", 

21 0x1A: "\u02C6", 

22 0x1B: "\u02D9", 

23 0x1C: "\u02DD", 

24 0x1D: "\u02DB", 

25 0x1E: "\u02DA", 

26 0x1F: "\u02DC", 

27 0x80: "\u2022", 

28 0x81: "\u2020", 

29 0x82: "\u2021", 

30 0x83: "\u2026", 

31 0x84: "\u2014", 

32 0x85: "\u2013", 

33 0x86: "\u0192", 

34 0x87: "\u2044", 

35 0x88: "\u2039", 

36 0x89: "\u203A", 

37 0x8A: "\u2212", 

38 0x8B: "\u2030", 

39 0x8C: "\u201E", 

40 0x8D: "\u201C", 

41 0x8E: "\u201D", 

42 0x8F: "\u2018", 

43 0x90: "\u2019", 

44 0x91: "\u201A", 

45 0x92: "\u2122", 

46 0x93: "\uFB01", 

47 0x94: "\uFB02", 

48 0x95: "\u0141", 

49 0x96: "\u0152", 

50 0x97: "\u0160", 

51 0x98: "\u0178", 

52 0x99: "\u017D", 

53 0x9A: "\u0131", 

54 0x9B: "\u0142", 

55 0x9C: "\u0153", 

56 0x9D: "\u0161", 

57 0x9E: "\u017E", 

58 0xA0: "\u20AC", 

59} 

60 

61 

62def decode_text(b): 

63 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: 

64 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") 

65 else: 

66 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b) 

67 

68 

69class PdfFormatError(RuntimeError): 

70 """An error that probably indicates a syntactic or semantic error in the 

71 PDF file structure""" 

72 

73 pass 

74 

75 

76def check_format_condition(condition, error_message): 

77 if not condition: 

78 raise PdfFormatError(error_message) 

79 

80 

81class IndirectReference( 

82 collections.namedtuple("IndirectReferenceTuple", ["object_id", "generation"]) 

83): 

84 def __str__(self): 

85 return "%s %s R" % self 

86 

87 def __bytes__(self): 

88 return self.__str__().encode("us-ascii") 

89 

90 def __eq__(self, other): 

91 return ( 

92 other.__class__ is self.__class__ 

93 and other.object_id == self.object_id 

94 and other.generation == self.generation 

95 ) 

96 

97 def __ne__(self, other): 

98 return not (self == other) 

99 

100 def __hash__(self): 

101 return hash((self.object_id, self.generation)) 

102 

103 

104class IndirectObjectDef(IndirectReference): 

105 def __str__(self): 

106 return "%s %s obj" % self 

107 

108 

109class XrefTable: 

110 def __init__(self): 

111 self.existing_entries = {} # object ID => (offset, generation) 

112 self.new_entries = {} # object ID => (offset, generation) 

113 self.deleted_entries = {0: 65536} # object ID => generation 

114 self.reading_finished = False 

115 

116 def __setitem__(self, key, value): 

117 if self.reading_finished: 

118 self.new_entries[key] = value 

119 else: 

120 self.existing_entries[key] = value 

121 if key in self.deleted_entries: 

122 del self.deleted_entries[key] 

123 

124 def __getitem__(self, key): 

125 try: 

126 return self.new_entries[key] 

127 except KeyError: 

128 return self.existing_entries[key] 

129 

130 def __delitem__(self, key): 

131 if key in self.new_entries: 

132 generation = self.new_entries[key][1] + 1 

133 del self.new_entries[key] 

134 self.deleted_entries[key] = generation 

135 elif key in self.existing_entries: 

136 generation = self.existing_entries[key][1] + 1 

137 self.deleted_entries[key] = generation 

138 elif key in self.deleted_entries: 

139 generation = self.deleted_entries[key] 

140 else: 

141 raise IndexError( 

142 "object ID " + str(key) + " cannot be deleted because it doesn't exist" 

143 ) 

144 

145 def __contains__(self, key): 

146 return key in self.existing_entries or key in self.new_entries 

147 

148 def __len__(self): 

149 return len( 

150 set(self.existing_entries.keys()) 

151 | set(self.new_entries.keys()) 

152 | set(self.deleted_entries.keys()) 

153 ) 

154 

155 def keys(self): 

156 return ( 

157 set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) 

158 ) | set(self.new_entries.keys()) 

159 

160 def write(self, f): 

161 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) 

162 deleted_keys = sorted(set(self.deleted_entries.keys())) 

163 startxref = f.tell() 

164 f.write(b"xref\n") 

165 while keys: 

166 # find a contiguous sequence of object IDs 

167 prev = None 

168 for index, key in enumerate(keys): 

169 if prev is None or prev + 1 == key: 

170 prev = key 

171 else: 

172 contiguous_keys = keys[:index] 

173 keys = keys[index:] 

174 break 

175 else: 

176 contiguous_keys = keys 

177 keys = None 

178 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) 

179 for object_id in contiguous_keys: 

180 if object_id in self.new_entries: 

181 f.write(b"%010d %05d n \n" % self.new_entries[object_id]) 

182 else: 

183 this_deleted_object_id = deleted_keys.pop(0) 

184 check_format_condition( 

185 object_id == this_deleted_object_id, 

186 f"expected the next deleted object ID to be {object_id}, " 

187 f"instead found {this_deleted_object_id}", 

188 ) 

189 try: 

190 next_in_linked_list = deleted_keys[0] 

191 except IndexError: 

192 next_in_linked_list = 0 

193 f.write( 

194 b"%010d %05d f \n" 

195 % (next_in_linked_list, self.deleted_entries[object_id]) 

196 ) 

197 return startxref 

198 

199 

200class PdfName: 

201 def __init__(self, name): 

202 if isinstance(name, PdfName): 

203 self.name = name.name 

204 elif isinstance(name, bytes): 

205 self.name = name 

206 else: 

207 self.name = name.encode("us-ascii") 

208 

209 def name_as_str(self): 

210 return self.name.decode("us-ascii") 

211 

212 def __eq__(self, other): 

213 return ( 

214 isinstance(other, PdfName) and other.name == self.name 

215 ) or other == self.name 

216 

217 def __hash__(self): 

218 return hash(self.name) 

219 

220 def __repr__(self): 

221 return f"PdfName({repr(self.name)})" 

222 

223 @classmethod 

224 def from_pdf_stream(cls, data): 

225 return cls(PdfParser.interpret_name(data)) 

226 

227 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} 

228 

229 def __bytes__(self): 

230 result = bytearray(b"/") 

231 for b in self.name: 

232 if b in self.allowed_chars: 

233 result.append(b) 

234 else: 

235 result.extend(b"#%02X" % b) 

236 return bytes(result) 

237 

238 

239class PdfArray(list): 

240 def __bytes__(self): 

241 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]" 

242 

243 

244class PdfDict(collections.UserDict): 

245 def __setattr__(self, key, value): 

246 if key == "data": 

247 collections.UserDict.__setattr__(self, key, value) 

248 else: 

249 self[key.encode("us-ascii")] = value 

250 

251 def __getattr__(self, key): 

252 try: 

253 value = self[key.encode("us-ascii")] 

254 except KeyError as e: 

255 raise AttributeError(key) from e 

256 if isinstance(value, bytes): 

257 value = decode_text(value) 

258 if key.endswith("Date"): 

259 if value.startswith("D:"): 

260 value = value[2:] 

261 

262 relationship = "Z" 

263 if len(value) > 17: 

264 relationship = value[14] 

265 offset = int(value[15:17]) * 60 

266 if len(value) > 20: 

267 offset += int(value[18:20]) 

268 

269 format = "%Y%m%d%H%M%S"[: len(value) - 2] 

270 value = time.strptime(value[: len(format) + 2], format) 

271 if relationship in ["+", "-"]: 

272 offset *= 60 

273 if relationship == "+": 

274 offset *= -1 

275 value = time.gmtime(calendar.timegm(value) + offset) 

276 return value 

277 

278 def __bytes__(self): 

279 out = bytearray(b"<<") 

280 for key, value in self.items(): 

281 if value is None: 

282 continue 

283 value = pdf_repr(value) 

284 out.extend(b"\n") 

285 out.extend(bytes(PdfName(key))) 

286 out.extend(b" ") 

287 out.extend(value) 

288 out.extend(b"\n>>") 

289 return bytes(out) 

290 

291 

292class PdfBinary: 

293 def __init__(self, data): 

294 self.data = data 

295 

296 def __bytes__(self): 

297 return b"<%s>" % b"".join(b"%02X" % b for b in self.data) 

298 

299 

300class PdfStream: 

301 def __init__(self, dictionary, buf): 

302 self.dictionary = dictionary 

303 self.buf = buf 

304 

305 def decode(self): 

306 try: 

307 filter = self.dictionary.Filter 

308 except AttributeError: 

309 return self.buf 

310 if filter == b"FlateDecode": 

311 try: 

312 expected_length = self.dictionary.DL 

313 except AttributeError: 

314 expected_length = self.dictionary.Length 

315 return zlib.decompress(self.buf, bufsize=int(expected_length)) 

316 else: 

317 raise NotImplementedError( 

318 f"stream filter {repr(self.dictionary.Filter)} unknown/unsupported" 

319 ) 

320 

321 

322def pdf_repr(x): 

323 if x is True: 

324 return b"true" 

325 elif x is False: 

326 return b"false" 

327 elif x is None: 

328 return b"null" 

329 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)): 

330 return bytes(x) 

331 elif isinstance(x, int): 

332 return str(x).encode("us-ascii") 

333 elif isinstance(x, float): 

334 return str(x).encode("us-ascii") 

335 elif isinstance(x, time.struct_time): 

336 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")" 

337 elif isinstance(x, dict): 

338 return bytes(PdfDict(x)) 

339 elif isinstance(x, list): 

340 return bytes(PdfArray(x)) 

341 elif isinstance(x, str): 

342 return pdf_repr(encode_text(x)) 

343 elif isinstance(x, bytes): 

344 # XXX escape more chars? handle binary garbage 

345 x = x.replace(b"\\", b"\\\\") 

346 x = x.replace(b"(", b"\\(") 

347 x = x.replace(b")", b"\\)") 

348 return b"(" + x + b")" 

349 else: 

350 return bytes(x) 

351 

352 

353class PdfParser: 

354 """Based on 

355 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf 

356 Supports PDF up to 1.4 

357 """ 

358 

359 def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"): 

360 if buf and f: 

361 raise RuntimeError("specify buf or f or filename, but not both buf and f") 

362 self.filename = filename 

363 self.buf = buf 

364 self.f = f 

365 self.start_offset = start_offset 

366 self.should_close_buf = False 

367 self.should_close_file = False 

368 if filename is not None and f is None: 

369 self.f = f = open(filename, mode) 

370 self.should_close_file = True 

371 if f is not None: 

372 self.buf = buf = self.get_buf_from_file(f) 

373 self.should_close_buf = True 

374 if not filename and hasattr(f, "name"): 

375 self.filename = f.name 

376 self.cached_objects = {} 

377 if buf: 

378 self.read_pdf_info() 

379 else: 

380 self.file_size_total = self.file_size_this = 0 

381 self.root = PdfDict() 

382 self.root_ref = None 

383 self.info = PdfDict() 

384 self.info_ref = None 

385 self.page_tree_root = {} 

386 self.pages = [] 

387 self.orig_pages = [] 

388 self.pages_ref = None 

389 self.last_xref_section_offset = None 

390 self.trailer_dict = {} 

391 self.xref_table = XrefTable() 

392 self.xref_table.reading_finished = True 

393 if f: 

394 self.seek_end() 

395 

396 def __enter__(self): 

397 return self 

398 

399 def __exit__(self, exc_type, exc_value, traceback): 

400 self.close() 

401 return False # do not suppress exceptions 

402 

403 def start_writing(self): 

404 self.close_buf() 

405 self.seek_end() 

406 

407 def close_buf(self): 

408 try: 

409 self.buf.close() 

410 except AttributeError: 

411 pass 

412 self.buf = None 

413 

414 def close(self): 

415 if self.should_close_buf: 

416 self.close_buf() 

417 if self.f is not None and self.should_close_file: 

418 self.f.close() 

419 self.f = None 

420 

421 def seek_end(self): 

422 self.f.seek(0, os.SEEK_END) 

423 

424 def write_header(self): 

425 self.f.write(b"%PDF-1.4\n") 

426 

427 def write_comment(self, s): 

428 self.f.write(f"% {s}\n".encode()) 

429 

430 def write_catalog(self): 

431 self.del_root() 

432 self.root_ref = self.next_object_id(self.f.tell()) 

433 self.pages_ref = self.next_object_id(0) 

434 self.rewrite_pages() 

435 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref) 

436 self.write_obj( 

437 self.pages_ref, 

438 Type=PdfName(b"Pages"), 

439 Count=len(self.pages), 

440 Kids=self.pages, 

441 ) 

442 return self.root_ref 

443 

444 def rewrite_pages(self): 

445 pages_tree_nodes_to_delete = [] 

446 for i, page_ref in enumerate(self.orig_pages): 

447 page_info = self.cached_objects[page_ref] 

448 del self.xref_table[page_ref.object_id] 

449 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")]) 

450 if page_ref not in self.pages: 

451 # the page has been deleted 

452 continue 

453 # make dict keys into strings for passing to write_page 

454 stringified_page_info = {} 

455 for key, value in page_info.items(): 

456 # key should be a PdfName 

457 stringified_page_info[key.name_as_str()] = value 

458 stringified_page_info["Parent"] = self.pages_ref 

459 new_page_ref = self.write_page(None, **stringified_page_info) 

460 for j, cur_page_ref in enumerate(self.pages): 

461 if cur_page_ref == page_ref: 

462 # replace the page reference with the new one 

463 self.pages[j] = new_page_ref 

464 # delete redundant Pages tree nodes from xref table 

465 for pages_tree_node_ref in pages_tree_nodes_to_delete: 

466 while pages_tree_node_ref: 

467 pages_tree_node = self.cached_objects[pages_tree_node_ref] 

468 if pages_tree_node_ref.object_id in self.xref_table: 

469 del self.xref_table[pages_tree_node_ref.object_id] 

470 pages_tree_node_ref = pages_tree_node.get(b"Parent", None) 

471 self.orig_pages = [] 

472 

473 def write_xref_and_trailer(self, new_root_ref=None): 

474 if new_root_ref: 

475 self.del_root() 

476 self.root_ref = new_root_ref 

477 if self.info: 

478 self.info_ref = self.write_obj(None, self.info) 

479 start_xref = self.xref_table.write(self.f) 

480 num_entries = len(self.xref_table) 

481 trailer_dict = {b"Root": self.root_ref, b"Size": num_entries} 

482 if self.last_xref_section_offset is not None: 

483 trailer_dict[b"Prev"] = self.last_xref_section_offset 

484 if self.info: 

485 trailer_dict[b"Info"] = self.info_ref 

486 self.last_xref_section_offset = start_xref 

487 self.f.write( 

488 b"trailer\n" 

489 + bytes(PdfDict(trailer_dict)) 

490 + b"\nstartxref\n%d\n%%%%EOF" % start_xref 

491 ) 

492 

493 def write_page(self, ref, *objs, **dict_obj): 

494 if isinstance(ref, int): 

495 ref = self.pages[ref] 

496 if "Type" not in dict_obj: 

497 dict_obj["Type"] = PdfName(b"Page") 

498 if "Parent" not in dict_obj: 

499 dict_obj["Parent"] = self.pages_ref 

500 return self.write_obj(ref, *objs, **dict_obj) 

501 

502 def write_obj(self, ref, *objs, **dict_obj): 

503 f = self.f 

504 if ref is None: 

505 ref = self.next_object_id(f.tell()) 

506 else: 

507 self.xref_table[ref.object_id] = (f.tell(), ref.generation) 

508 f.write(bytes(IndirectObjectDef(*ref))) 

509 stream = dict_obj.pop("stream", None) 

510 if stream is not None: 

511 dict_obj["Length"] = len(stream) 

512 if dict_obj: 

513 f.write(pdf_repr(dict_obj)) 

514 for obj in objs: 

515 f.write(pdf_repr(obj)) 

516 if stream is not None: 

517 f.write(b"stream\n") 

518 f.write(stream) 

519 f.write(b"\nendstream\n") 

520 f.write(b"endobj\n") 

521 return ref 

522 

523 def del_root(self): 

524 if self.root_ref is None: 

525 return 

526 del self.xref_table[self.root_ref.object_id] 

527 del self.xref_table[self.root[b"Pages"].object_id] 

528 

529 @staticmethod 

530 def get_buf_from_file(f): 

531 if hasattr(f, "getbuffer"): 

532 return f.getbuffer() 

533 elif hasattr(f, "getvalue"): 

534 return f.getvalue() 

535 else: 

536 try: 

537 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 

538 except ValueError: # cannot mmap an empty file 

539 return b"" 

540 

541 def read_pdf_info(self): 

542 self.file_size_total = len(self.buf) 

543 self.file_size_this = self.file_size_total - self.start_offset 

544 self.read_trailer() 

545 self.root_ref = self.trailer_dict[b"Root"] 

546 self.info_ref = self.trailer_dict.get(b"Info", None) 

547 self.root = PdfDict(self.read_indirect(self.root_ref)) 

548 if self.info_ref is None: 

549 self.info = PdfDict() 

550 else: 

551 self.info = PdfDict(self.read_indirect(self.info_ref)) 

552 check_format_condition(b"Type" in self.root, "/Type missing in Root") 

553 check_format_condition( 

554 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" 

555 ) 

556 check_format_condition(b"Pages" in self.root, "/Pages missing in Root") 

557 check_format_condition( 

558 isinstance(self.root[b"Pages"], IndirectReference), 

559 "/Pages in Root is not an indirect reference", 

560 ) 

561 self.pages_ref = self.root[b"Pages"] 

562 self.page_tree_root = self.read_indirect(self.pages_ref) 

563 self.pages = self.linearize_page_tree(self.page_tree_root) 

564 # save the original list of page references 

565 # in case the user modifies, adds or deletes some pages 

566 # and we need to rewrite the pages and their list 

567 self.orig_pages = self.pages[:] 

568 

569 def next_object_id(self, offset=None): 

570 try: 

571 # TODO: support reuse of deleted objects 

572 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) 

573 except ValueError: 

574 reference = IndirectReference(1, 0) 

575 if offset is not None: 

576 self.xref_table[reference.object_id] = (offset, 0) 

577 return reference 

578 

579 delimiter = rb"[][()<>{}/%]" 

580 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]" 

581 whitespace = rb"[\000\011\012\014\015\040]" 

582 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]" 

583 whitespace_optional = whitespace + b"*" 

584 whitespace_mandatory = whitespace + b"+" 

585 # No "\012" aka "\n" or "\015" aka "\r": 

586 whitespace_optional_no_nl = rb"[\000\011\014\040]*" 

587 newline_only = rb"[\r\n]+" 

588 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl 

589 re_trailer_end = re.compile( 

590 whitespace_mandatory 

591 + rb"trailer" 

592 + whitespace_optional 

593 + rb"<<(.*>>)" 

594 + newline 

595 + rb"startxref" 

596 + newline 

597 + rb"([0-9]+)" 

598 + newline 

599 + rb"%%EOF" 

600 + whitespace_optional 

601 + rb"$", 

602 re.DOTALL, 

603 ) 

604 re_trailer_prev = re.compile( 

605 whitespace_optional 

606 + rb"trailer" 

607 + whitespace_optional 

608 + rb"<<(.*?>>)" 

609 + newline 

610 + rb"startxref" 

611 + newline 

612 + rb"([0-9]+)" 

613 + newline 

614 + rb"%%EOF" 

615 + whitespace_optional, 

616 re.DOTALL, 

617 ) 

618 

619 def read_trailer(self): 

620 search_start_offset = len(self.buf) - 16384 

621 if search_start_offset < self.start_offset: 

622 search_start_offset = self.start_offset 

623 m = self.re_trailer_end.search(self.buf, search_start_offset) 

624 check_format_condition(m, "trailer end not found") 

625 # make sure we found the LAST trailer 

626 last_match = m 

627 while m: 

628 last_match = m 

629 m = self.re_trailer_end.search(self.buf, m.start() + 16) 

630 if not m: 

631 m = last_match 

632 trailer_data = m.group(1) 

633 self.last_xref_section_offset = int(m.group(2)) 

634 self.trailer_dict = self.interpret_trailer(trailer_data) 

635 self.xref_table = XrefTable() 

636 self.read_xref_table(xref_section_offset=self.last_xref_section_offset) 

637 if b"Prev" in self.trailer_dict: 

638 self.read_prev_trailer(self.trailer_dict[b"Prev"]) 

639 

640 def read_prev_trailer(self, xref_section_offset): 

641 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) 

642 m = self.re_trailer_prev.search( 

643 self.buf[trailer_offset : trailer_offset + 16384] 

644 ) 

645 check_format_condition(m, "previous trailer not found") 

646 trailer_data = m.group(1) 

647 check_format_condition( 

648 int(m.group(2)) == xref_section_offset, 

649 "xref section offset in previous trailer doesn't match what was expected", 

650 ) 

651 trailer_dict = self.interpret_trailer(trailer_data) 

652 if b"Prev" in trailer_dict: 

653 self.read_prev_trailer(trailer_dict[b"Prev"]) 

654 

655 re_whitespace_optional = re.compile(whitespace_optional) 

656 re_name = re.compile( 

657 whitespace_optional 

658 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?=" 

659 + delimiter_or_ws 

660 + rb")" 

661 ) 

662 re_dict_start = re.compile(whitespace_optional + rb"<<") 

663 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) 

664 

665 @classmethod 

666 def interpret_trailer(cls, trailer_data): 

667 trailer = {} 

668 offset = 0 

669 while True: 

670 m = cls.re_name.match(trailer_data, offset) 

671 if not m: 

672 m = cls.re_dict_end.match(trailer_data, offset) 

673 check_format_condition( 

674 m and m.end() == len(trailer_data), 

675 "name not found in trailer, remaining data: " 

676 + repr(trailer_data[offset:]), 

677 ) 

678 break 

679 key = cls.interpret_name(m.group(1)) 

680 value, offset = cls.get_value(trailer_data, m.end()) 

681 trailer[key] = value 

682 check_format_condition( 

683 b"Size" in trailer and isinstance(trailer[b"Size"], int), 

684 "/Size not in trailer or not an integer", 

685 ) 

686 check_format_condition( 

687 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference), 

688 "/Root not in trailer or not an indirect reference", 

689 ) 

690 return trailer 

691 

692 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") 

693 

694 @classmethod 

695 def interpret_name(cls, raw, as_text=False): 

696 name = b"" 

697 for m in cls.re_hashes_in_name.finditer(raw): 

698 if m.group(3): 

699 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii")) 

700 else: 

701 name += m.group(1) 

702 if as_text: 

703 return name.decode("utf-8") 

704 else: 

705 return bytes(name) 

706 

707 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")") 

708 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")") 

709 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")") 

710 re_int = re.compile( 

711 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")" 

712 ) 

713 re_real = re.compile( 

714 whitespace_optional 

715 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?=" 

716 + delimiter_or_ws 

717 + rb")" 

718 ) 

719 re_array_start = re.compile(whitespace_optional + rb"\[") 

720 re_array_end = re.compile(whitespace_optional + rb"]") 

721 re_string_hex = re.compile( 

722 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>" 

723 ) 

724 re_string_lit = re.compile(whitespace_optional + rb"\(") 

725 re_indirect_reference = re.compile( 

726 whitespace_optional 

727 + rb"([-+]?[0-9]+)" 

728 + whitespace_mandatory 

729 + rb"([-+]?[0-9]+)" 

730 + whitespace_mandatory 

731 + rb"R(?=" 

732 + delimiter_or_ws 

733 + rb")" 

734 ) 

735 re_indirect_def_start = re.compile( 

736 whitespace_optional 

737 + rb"([-+]?[0-9]+)" 

738 + whitespace_mandatory 

739 + rb"([-+]?[0-9]+)" 

740 + whitespace_mandatory 

741 + rb"obj(?=" 

742 + delimiter_or_ws 

743 + rb")" 

744 ) 

745 re_indirect_def_end = re.compile( 

746 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")" 

747 ) 

748 re_comment = re.compile( 

749 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*" 

750 ) 

751 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n") 

752 re_stream_end = re.compile( 

753 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")" 

754 ) 

755 

756 @classmethod 

757 def get_value(cls, data, offset, expect_indirect=None, max_nesting=-1): 

758 if max_nesting == 0: 

759 return None, None 

760 m = cls.re_comment.match(data, offset) 

761 if m: 

762 offset = m.end() 

763 m = cls.re_indirect_def_start.match(data, offset) 

764 if m: 

765 check_format_condition( 

766 int(m.group(1)) > 0, 

767 "indirect object definition: object ID must be greater than 0", 

768 ) 

769 check_format_condition( 

770 int(m.group(2)) >= 0, 

771 "indirect object definition: generation must be non-negative", 

772 ) 

773 check_format_condition( 

774 expect_indirect is None 

775 or expect_indirect 

776 == IndirectReference(int(m.group(1)), int(m.group(2))), 

777 "indirect object definition different than expected", 

778 ) 

779 object, offset = cls.get_value(data, m.end(), max_nesting=max_nesting - 1) 

780 if offset is None: 

781 return object, None 

782 m = cls.re_indirect_def_end.match(data, offset) 

783 check_format_condition(m, "indirect object definition end not found") 

784 return object, m.end() 

785 check_format_condition( 

786 not expect_indirect, "indirect object definition not found" 

787 ) 

788 m = cls.re_indirect_reference.match(data, offset) 

789 if m: 

790 check_format_condition( 

791 int(m.group(1)) > 0, 

792 "indirect object reference: object ID must be greater than 0", 

793 ) 

794 check_format_condition( 

795 int(m.group(2)) >= 0, 

796 "indirect object reference: generation must be non-negative", 

797 ) 

798 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end() 

799 m = cls.re_dict_start.match(data, offset) 

800 if m: 

801 offset = m.end() 

802 result = {} 

803 m = cls.re_dict_end.match(data, offset) 

804 while not m: 

805 key, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) 

806 if offset is None: 

807 return result, None 

808 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) 

809 result[key] = value 

810 if offset is None: 

811 return result, None 

812 m = cls.re_dict_end.match(data, offset) 

813 offset = m.end() 

814 m = cls.re_stream_start.match(data, offset) 

815 if m: 

816 try: 

817 stream_len = int(result[b"Length"]) 

818 except (TypeError, KeyError, ValueError) as e: 

819 raise PdfFormatError( 

820 "bad or missing Length in stream dict (%r)" 

821 % result.get(b"Length", None) 

822 ) from e 

823 stream_data = data[m.end() : m.end() + stream_len] 

824 m = cls.re_stream_end.match(data, m.end() + stream_len) 

825 check_format_condition(m, "stream end not found") 

826 offset = m.end() 

827 result = PdfStream(PdfDict(result), stream_data) 

828 else: 

829 result = PdfDict(result) 

830 return result, offset 

831 m = cls.re_array_start.match(data, offset) 

832 if m: 

833 offset = m.end() 

834 result = [] 

835 m = cls.re_array_end.match(data, offset) 

836 while not m: 

837 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) 

838 result.append(value) 

839 if offset is None: 

840 return result, None 

841 m = cls.re_array_end.match(data, offset) 

842 return result, m.end() 

843 m = cls.re_null.match(data, offset) 

844 if m: 

845 return None, m.end() 

846 m = cls.re_true.match(data, offset) 

847 if m: 

848 return True, m.end() 

849 m = cls.re_false.match(data, offset) 

850 if m: 

851 return False, m.end() 

852 m = cls.re_name.match(data, offset) 

853 if m: 

854 return PdfName(cls.interpret_name(m.group(1))), m.end() 

855 m = cls.re_int.match(data, offset) 

856 if m: 

857 return int(m.group(1)), m.end() 

858 m = cls.re_real.match(data, offset) 

859 if m: 

860 # XXX Decimal instead of float??? 

861 return float(m.group(1)), m.end() 

862 m = cls.re_string_hex.match(data, offset) 

863 if m: 

864 # filter out whitespace 

865 hex_string = bytearray( 

866 b for b in m.group(1) if b in b"0123456789abcdefABCDEF" 

867 ) 

868 if len(hex_string) % 2 == 1: 

869 # append a 0 if the length is not even - yes, at the end 

870 hex_string.append(ord(b"0")) 

871 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end() 

872 m = cls.re_string_lit.match(data, offset) 

873 if m: 

874 return cls.get_literal_string(data, m.end()) 

875 # return None, offset # fallback (only for debugging) 

876 raise PdfFormatError("unrecognized object: " + repr(data[offset : offset + 32])) 

877 

878 re_lit_str_token = re.compile( 

879 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))" 

880 ) 

881 escaped_chars = { 

882 b"n": b"\n", 

883 b"r": b"\r", 

884 b"t": b"\t", 

885 b"b": b"\b", 

886 b"f": b"\f", 

887 b"(": b"(", 

888 b")": b")", 

889 b"\\": b"\\", 

890 ord(b"n"): b"\n", 

891 ord(b"r"): b"\r", 

892 ord(b"t"): b"\t", 

893 ord(b"b"): b"\b", 

894 ord(b"f"): b"\f", 

895 ord(b"("): b"(", 

896 ord(b")"): b")", 

897 ord(b"\\"): b"\\", 

898 } 

899 

900 @classmethod 

901 def get_literal_string(cls, data, offset): 

902 nesting_depth = 0 

903 result = bytearray() 

904 for m in cls.re_lit_str_token.finditer(data, offset): 

905 result.extend(data[offset : m.start()]) 

906 if m.group(1): 

907 result.extend(cls.escaped_chars[m.group(1)[1]]) 

908 elif m.group(2): 

909 result.append(int(m.group(2)[1:], 8)) 

910 elif m.group(3): 

911 pass 

912 elif m.group(5): 

913 result.extend(b"\n") 

914 elif m.group(6): 

915 result.extend(b"(") 

916 nesting_depth += 1 

917 elif m.group(7): 

918 if nesting_depth == 0: 

919 return bytes(result), m.end() 

920 result.extend(b")") 

921 nesting_depth -= 1 

922 offset = m.end() 

923 raise PdfFormatError("unfinished literal string") 

924 

925 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline) 

926 re_xref_subsection_start = re.compile( 

927 whitespace_optional 

928 + rb"([0-9]+)" 

929 + whitespace_mandatory 

930 + rb"([0-9]+)" 

931 + whitespace_optional 

932 + newline_only 

933 ) 

934 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") 

935 

936 def read_xref_table(self, xref_section_offset): 

937 subsection_found = False 

938 m = self.re_xref_section_start.match( 

939 self.buf, xref_section_offset + self.start_offset 

940 ) 

941 check_format_condition(m, "xref section start not found") 

942 offset = m.end() 

943 while True: 

944 m = self.re_xref_subsection_start.match(self.buf, offset) 

945 if not m: 

946 check_format_condition( 

947 subsection_found, "xref subsection start not found" 

948 ) 

949 break 

950 subsection_found = True 

951 offset = m.end() 

952 first_object = int(m.group(1)) 

953 num_objects = int(m.group(2)) 

954 for i in range(first_object, first_object + num_objects): 

955 m = self.re_xref_entry.match(self.buf, offset) 

956 check_format_condition(m, "xref entry not found") 

957 offset = m.end() 

958 is_free = m.group(3) == b"f" 

959 generation = int(m.group(2)) 

960 if not is_free: 

961 new_entry = (int(m.group(1)), generation) 

962 check_format_condition( 

963 i not in self.xref_table or self.xref_table[i] == new_entry, 

964 "xref entry duplicated (and not identical)", 

965 ) 

966 self.xref_table[i] = new_entry 

967 return offset 

968 

969 def read_indirect(self, ref, max_nesting=-1): 

970 offset, generation = self.xref_table[ref[0]] 

971 check_format_condition( 

972 generation == ref[1], 

973 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " 

974 f"table, instead found generation {generation} at offset {offset}", 

975 ) 

976 value = self.get_value( 

977 self.buf, 

978 offset + self.start_offset, 

979 expect_indirect=IndirectReference(*ref), 

980 max_nesting=max_nesting, 

981 )[0] 

982 self.cached_objects[ref] = value 

983 return value 

984 

985 def linearize_page_tree(self, node=None): 

986 if node is None: 

987 node = self.page_tree_root 

988 check_format_condition( 

989 node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" 

990 ) 

991 pages = [] 

992 for kid in node[b"Kids"]: 

993 kid_object = self.read_indirect(kid) 

994 if kid_object[b"Type"] == b"Page": 

995 pages.append(kid) 

996 else: 

997 pages.extend(self.linearize_page_tree(node=kid_object)) 

998 return pages