Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/PIL/PdfParser.py: 16%
630 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import calendar
2import codecs
3import collections
4import mmap
5import os
6import re
7import time
8import zlib
11# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set
12# on page 656
13def encode_text(s):
14 return codecs.BOM_UTF16_BE + s.encode("utf_16_be")
17PDFDocEncoding = {
18 0x16: "\u0017",
19 0x18: "\u02D8",
20 0x19: "\u02C7",
21 0x1A: "\u02C6",
22 0x1B: "\u02D9",
23 0x1C: "\u02DD",
24 0x1D: "\u02DB",
25 0x1E: "\u02DA",
26 0x1F: "\u02DC",
27 0x80: "\u2022",
28 0x81: "\u2020",
29 0x82: "\u2021",
30 0x83: "\u2026",
31 0x84: "\u2014",
32 0x85: "\u2013",
33 0x86: "\u0192",
34 0x87: "\u2044",
35 0x88: "\u2039",
36 0x89: "\u203A",
37 0x8A: "\u2212",
38 0x8B: "\u2030",
39 0x8C: "\u201E",
40 0x8D: "\u201C",
41 0x8E: "\u201D",
42 0x8F: "\u2018",
43 0x90: "\u2019",
44 0x91: "\u201A",
45 0x92: "\u2122",
46 0x93: "\uFB01",
47 0x94: "\uFB02",
48 0x95: "\u0141",
49 0x96: "\u0152",
50 0x97: "\u0160",
51 0x98: "\u0178",
52 0x99: "\u017D",
53 0x9A: "\u0131",
54 0x9B: "\u0142",
55 0x9C: "\u0153",
56 0x9D: "\u0161",
57 0x9E: "\u017E",
58 0xA0: "\u20AC",
59}
62def decode_text(b):
63 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
64 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be")
65 else:
66 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b)
69class PdfFormatError(RuntimeError):
70 """An error that probably indicates a syntactic or semantic error in the
71 PDF file structure"""
73 pass
76def check_format_condition(condition, error_message):
77 if not condition:
78 raise PdfFormatError(error_message)
81class IndirectReference(
82 collections.namedtuple("IndirectReferenceTuple", ["object_id", "generation"])
83):
84 def __str__(self):
85 return "%s %s R" % self
87 def __bytes__(self):
88 return self.__str__().encode("us-ascii")
90 def __eq__(self, other):
91 return (
92 other.__class__ is self.__class__
93 and other.object_id == self.object_id
94 and other.generation == self.generation
95 )
97 def __ne__(self, other):
98 return not (self == other)
100 def __hash__(self):
101 return hash((self.object_id, self.generation))
104class IndirectObjectDef(IndirectReference):
105 def __str__(self):
106 return "%s %s obj" % self
109class XrefTable:
110 def __init__(self):
111 self.existing_entries = {} # object ID => (offset, generation)
112 self.new_entries = {} # object ID => (offset, generation)
113 self.deleted_entries = {0: 65536} # object ID => generation
114 self.reading_finished = False
116 def __setitem__(self, key, value):
117 if self.reading_finished:
118 self.new_entries[key] = value
119 else:
120 self.existing_entries[key] = value
121 if key in self.deleted_entries:
122 del self.deleted_entries[key]
124 def __getitem__(self, key):
125 try:
126 return self.new_entries[key]
127 except KeyError:
128 return self.existing_entries[key]
130 def __delitem__(self, key):
131 if key in self.new_entries:
132 generation = self.new_entries[key][1] + 1
133 del self.new_entries[key]
134 self.deleted_entries[key] = generation
135 elif key in self.existing_entries:
136 generation = self.existing_entries[key][1] + 1
137 self.deleted_entries[key] = generation
138 elif key in self.deleted_entries:
139 generation = self.deleted_entries[key]
140 else:
141 raise IndexError(
142 "object ID " + str(key) + " cannot be deleted because it doesn't exist"
143 )
145 def __contains__(self, key):
146 return key in self.existing_entries or key in self.new_entries
148 def __len__(self):
149 return len(
150 set(self.existing_entries.keys())
151 | set(self.new_entries.keys())
152 | set(self.deleted_entries.keys())
153 )
155 def keys(self):
156 return (
157 set(self.existing_entries.keys()) - set(self.deleted_entries.keys())
158 ) | set(self.new_entries.keys())
160 def write(self, f):
161 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
162 deleted_keys = sorted(set(self.deleted_entries.keys()))
163 startxref = f.tell()
164 f.write(b"xref\n")
165 while keys:
166 # find a contiguous sequence of object IDs
167 prev = None
168 for index, key in enumerate(keys):
169 if prev is None or prev + 1 == key:
170 prev = key
171 else:
172 contiguous_keys = keys[:index]
173 keys = keys[index:]
174 break
175 else:
176 contiguous_keys = keys
177 keys = None
178 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys)))
179 for object_id in contiguous_keys:
180 if object_id in self.new_entries:
181 f.write(b"%010d %05d n \n" % self.new_entries[object_id])
182 else:
183 this_deleted_object_id = deleted_keys.pop(0)
184 check_format_condition(
185 object_id == this_deleted_object_id,
186 f"expected the next deleted object ID to be {object_id}, "
187 f"instead found {this_deleted_object_id}",
188 )
189 try:
190 next_in_linked_list = deleted_keys[0]
191 except IndexError:
192 next_in_linked_list = 0
193 f.write(
194 b"%010d %05d f \n"
195 % (next_in_linked_list, self.deleted_entries[object_id])
196 )
197 return startxref
200class PdfName:
201 def __init__(self, name):
202 if isinstance(name, PdfName):
203 self.name = name.name
204 elif isinstance(name, bytes):
205 self.name = name
206 else:
207 self.name = name.encode("us-ascii")
209 def name_as_str(self):
210 return self.name.decode("us-ascii")
212 def __eq__(self, other):
213 return (
214 isinstance(other, PdfName) and other.name == self.name
215 ) or other == self.name
217 def __hash__(self):
218 return hash(self.name)
220 def __repr__(self):
221 return f"PdfName({repr(self.name)})"
223 @classmethod
224 def from_pdf_stream(cls, data):
225 return cls(PdfParser.interpret_name(data))
227 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"}
229 def __bytes__(self):
230 result = bytearray(b"/")
231 for b in self.name:
232 if b in self.allowed_chars:
233 result.append(b)
234 else:
235 result.extend(b"#%02X" % b)
236 return bytes(result)
239class PdfArray(list):
240 def __bytes__(self):
241 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]"
244class PdfDict(collections.UserDict):
245 def __setattr__(self, key, value):
246 if key == "data":
247 collections.UserDict.__setattr__(self, key, value)
248 else:
249 self[key.encode("us-ascii")] = value
251 def __getattr__(self, key):
252 try:
253 value = self[key.encode("us-ascii")]
254 except KeyError as e:
255 raise AttributeError(key) from e
256 if isinstance(value, bytes):
257 value = decode_text(value)
258 if key.endswith("Date"):
259 if value.startswith("D:"):
260 value = value[2:]
262 relationship = "Z"
263 if len(value) > 17:
264 relationship = value[14]
265 offset = int(value[15:17]) * 60
266 if len(value) > 20:
267 offset += int(value[18:20])
269 format = "%Y%m%d%H%M%S"[: len(value) - 2]
270 value = time.strptime(value[: len(format) + 2], format)
271 if relationship in ["+", "-"]:
272 offset *= 60
273 if relationship == "+":
274 offset *= -1
275 value = time.gmtime(calendar.timegm(value) + offset)
276 return value
278 def __bytes__(self):
279 out = bytearray(b"<<")
280 for key, value in self.items():
281 if value is None:
282 continue
283 value = pdf_repr(value)
284 out.extend(b"\n")
285 out.extend(bytes(PdfName(key)))
286 out.extend(b" ")
287 out.extend(value)
288 out.extend(b"\n>>")
289 return bytes(out)
292class PdfBinary:
293 def __init__(self, data):
294 self.data = data
296 def __bytes__(self):
297 return b"<%s>" % b"".join(b"%02X" % b for b in self.data)
300class PdfStream:
301 def __init__(self, dictionary, buf):
302 self.dictionary = dictionary
303 self.buf = buf
305 def decode(self):
306 try:
307 filter = self.dictionary.Filter
308 except AttributeError:
309 return self.buf
310 if filter == b"FlateDecode":
311 try:
312 expected_length = self.dictionary.DL
313 except AttributeError:
314 expected_length = self.dictionary.Length
315 return zlib.decompress(self.buf, bufsize=int(expected_length))
316 else:
317 raise NotImplementedError(
318 f"stream filter {repr(self.dictionary.Filter)} unknown/unsupported"
319 )
322def pdf_repr(x):
323 if x is True:
324 return b"true"
325 elif x is False:
326 return b"false"
327 elif x is None:
328 return b"null"
329 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)):
330 return bytes(x)
331 elif isinstance(x, int):
332 return str(x).encode("us-ascii")
333 elif isinstance(x, float):
334 return str(x).encode("us-ascii")
335 elif isinstance(x, time.struct_time):
336 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")"
337 elif isinstance(x, dict):
338 return bytes(PdfDict(x))
339 elif isinstance(x, list):
340 return bytes(PdfArray(x))
341 elif isinstance(x, str):
342 return pdf_repr(encode_text(x))
343 elif isinstance(x, bytes):
344 # XXX escape more chars? handle binary garbage
345 x = x.replace(b"\\", b"\\\\")
346 x = x.replace(b"(", b"\\(")
347 x = x.replace(b")", b"\\)")
348 return b"(" + x + b")"
349 else:
350 return bytes(x)
353class PdfParser:
354 """Based on
355 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf
356 Supports PDF up to 1.4
357 """
359 def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"):
360 if buf and f:
361 raise RuntimeError("specify buf or f or filename, but not both buf and f")
362 self.filename = filename
363 self.buf = buf
364 self.f = f
365 self.start_offset = start_offset
366 self.should_close_buf = False
367 self.should_close_file = False
368 if filename is not None and f is None:
369 self.f = f = open(filename, mode)
370 self.should_close_file = True
371 if f is not None:
372 self.buf = buf = self.get_buf_from_file(f)
373 self.should_close_buf = True
374 if not filename and hasattr(f, "name"):
375 self.filename = f.name
376 self.cached_objects = {}
377 if buf:
378 self.read_pdf_info()
379 else:
380 self.file_size_total = self.file_size_this = 0
381 self.root = PdfDict()
382 self.root_ref = None
383 self.info = PdfDict()
384 self.info_ref = None
385 self.page_tree_root = {}
386 self.pages = []
387 self.orig_pages = []
388 self.pages_ref = None
389 self.last_xref_section_offset = None
390 self.trailer_dict = {}
391 self.xref_table = XrefTable()
392 self.xref_table.reading_finished = True
393 if f:
394 self.seek_end()
396 def __enter__(self):
397 return self
399 def __exit__(self, exc_type, exc_value, traceback):
400 self.close()
401 return False # do not suppress exceptions
403 def start_writing(self):
404 self.close_buf()
405 self.seek_end()
407 def close_buf(self):
408 try:
409 self.buf.close()
410 except AttributeError:
411 pass
412 self.buf = None
414 def close(self):
415 if self.should_close_buf:
416 self.close_buf()
417 if self.f is not None and self.should_close_file:
418 self.f.close()
419 self.f = None
421 def seek_end(self):
422 self.f.seek(0, os.SEEK_END)
424 def write_header(self):
425 self.f.write(b"%PDF-1.4\n")
427 def write_comment(self, s):
428 self.f.write(f"% {s}\n".encode())
430 def write_catalog(self):
431 self.del_root()
432 self.root_ref = self.next_object_id(self.f.tell())
433 self.pages_ref = self.next_object_id(0)
434 self.rewrite_pages()
435 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref)
436 self.write_obj(
437 self.pages_ref,
438 Type=PdfName(b"Pages"),
439 Count=len(self.pages),
440 Kids=self.pages,
441 )
442 return self.root_ref
444 def rewrite_pages(self):
445 pages_tree_nodes_to_delete = []
446 for i, page_ref in enumerate(self.orig_pages):
447 page_info = self.cached_objects[page_ref]
448 del self.xref_table[page_ref.object_id]
449 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")])
450 if page_ref not in self.pages:
451 # the page has been deleted
452 continue
453 # make dict keys into strings for passing to write_page
454 stringified_page_info = {}
455 for key, value in page_info.items():
456 # key should be a PdfName
457 stringified_page_info[key.name_as_str()] = value
458 stringified_page_info["Parent"] = self.pages_ref
459 new_page_ref = self.write_page(None, **stringified_page_info)
460 for j, cur_page_ref in enumerate(self.pages):
461 if cur_page_ref == page_ref:
462 # replace the page reference with the new one
463 self.pages[j] = new_page_ref
464 # delete redundant Pages tree nodes from xref table
465 for pages_tree_node_ref in pages_tree_nodes_to_delete:
466 while pages_tree_node_ref:
467 pages_tree_node = self.cached_objects[pages_tree_node_ref]
468 if pages_tree_node_ref.object_id in self.xref_table:
469 del self.xref_table[pages_tree_node_ref.object_id]
470 pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
471 self.orig_pages = []
473 def write_xref_and_trailer(self, new_root_ref=None):
474 if new_root_ref:
475 self.del_root()
476 self.root_ref = new_root_ref
477 if self.info:
478 self.info_ref = self.write_obj(None, self.info)
479 start_xref = self.xref_table.write(self.f)
480 num_entries = len(self.xref_table)
481 trailer_dict = {b"Root": self.root_ref, b"Size": num_entries}
482 if self.last_xref_section_offset is not None:
483 trailer_dict[b"Prev"] = self.last_xref_section_offset
484 if self.info:
485 trailer_dict[b"Info"] = self.info_ref
486 self.last_xref_section_offset = start_xref
487 self.f.write(
488 b"trailer\n"
489 + bytes(PdfDict(trailer_dict))
490 + b"\nstartxref\n%d\n%%%%EOF" % start_xref
491 )
493 def write_page(self, ref, *objs, **dict_obj):
494 if isinstance(ref, int):
495 ref = self.pages[ref]
496 if "Type" not in dict_obj:
497 dict_obj["Type"] = PdfName(b"Page")
498 if "Parent" not in dict_obj:
499 dict_obj["Parent"] = self.pages_ref
500 return self.write_obj(ref, *objs, **dict_obj)
502 def write_obj(self, ref, *objs, **dict_obj):
503 f = self.f
504 if ref is None:
505 ref = self.next_object_id(f.tell())
506 else:
507 self.xref_table[ref.object_id] = (f.tell(), ref.generation)
508 f.write(bytes(IndirectObjectDef(*ref)))
509 stream = dict_obj.pop("stream", None)
510 if stream is not None:
511 dict_obj["Length"] = len(stream)
512 if dict_obj:
513 f.write(pdf_repr(dict_obj))
514 for obj in objs:
515 f.write(pdf_repr(obj))
516 if stream is not None:
517 f.write(b"stream\n")
518 f.write(stream)
519 f.write(b"\nendstream\n")
520 f.write(b"endobj\n")
521 return ref
523 def del_root(self):
524 if self.root_ref is None:
525 return
526 del self.xref_table[self.root_ref.object_id]
527 del self.xref_table[self.root[b"Pages"].object_id]
529 @staticmethod
530 def get_buf_from_file(f):
531 if hasattr(f, "getbuffer"):
532 return f.getbuffer()
533 elif hasattr(f, "getvalue"):
534 return f.getvalue()
535 else:
536 try:
537 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
538 except ValueError: # cannot mmap an empty file
539 return b""
541 def read_pdf_info(self):
542 self.file_size_total = len(self.buf)
543 self.file_size_this = self.file_size_total - self.start_offset
544 self.read_trailer()
545 self.root_ref = self.trailer_dict[b"Root"]
546 self.info_ref = self.trailer_dict.get(b"Info", None)
547 self.root = PdfDict(self.read_indirect(self.root_ref))
548 if self.info_ref is None:
549 self.info = PdfDict()
550 else:
551 self.info = PdfDict(self.read_indirect(self.info_ref))
552 check_format_condition(b"Type" in self.root, "/Type missing in Root")
553 check_format_condition(
554 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog"
555 )
556 check_format_condition(b"Pages" in self.root, "/Pages missing in Root")
557 check_format_condition(
558 isinstance(self.root[b"Pages"], IndirectReference),
559 "/Pages in Root is not an indirect reference",
560 )
561 self.pages_ref = self.root[b"Pages"]
562 self.page_tree_root = self.read_indirect(self.pages_ref)
563 self.pages = self.linearize_page_tree(self.page_tree_root)
564 # save the original list of page references
565 # in case the user modifies, adds or deletes some pages
566 # and we need to rewrite the pages and their list
567 self.orig_pages = self.pages[:]
569 def next_object_id(self, offset=None):
570 try:
571 # TODO: support reuse of deleted objects
572 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
573 except ValueError:
574 reference = IndirectReference(1, 0)
575 if offset is not None:
576 self.xref_table[reference.object_id] = (offset, 0)
577 return reference
579 delimiter = rb"[][()<>{}/%]"
580 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]"
581 whitespace = rb"[\000\011\012\014\015\040]"
582 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]"
583 whitespace_optional = whitespace + b"*"
584 whitespace_mandatory = whitespace + b"+"
585 # No "\012" aka "\n" or "\015" aka "\r":
586 whitespace_optional_no_nl = rb"[\000\011\014\040]*"
587 newline_only = rb"[\r\n]+"
588 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl
589 re_trailer_end = re.compile(
590 whitespace_mandatory
591 + rb"trailer"
592 + whitespace_optional
593 + rb"<<(.*>>)"
594 + newline
595 + rb"startxref"
596 + newline
597 + rb"([0-9]+)"
598 + newline
599 + rb"%%EOF"
600 + whitespace_optional
601 + rb"$",
602 re.DOTALL,
603 )
604 re_trailer_prev = re.compile(
605 whitespace_optional
606 + rb"trailer"
607 + whitespace_optional
608 + rb"<<(.*?>>)"
609 + newline
610 + rb"startxref"
611 + newline
612 + rb"([0-9]+)"
613 + newline
614 + rb"%%EOF"
615 + whitespace_optional,
616 re.DOTALL,
617 )
619 def read_trailer(self):
620 search_start_offset = len(self.buf) - 16384
621 if search_start_offset < self.start_offset:
622 search_start_offset = self.start_offset
623 m = self.re_trailer_end.search(self.buf, search_start_offset)
624 check_format_condition(m, "trailer end not found")
625 # make sure we found the LAST trailer
626 last_match = m
627 while m:
628 last_match = m
629 m = self.re_trailer_end.search(self.buf, m.start() + 16)
630 if not m:
631 m = last_match
632 trailer_data = m.group(1)
633 self.last_xref_section_offset = int(m.group(2))
634 self.trailer_dict = self.interpret_trailer(trailer_data)
635 self.xref_table = XrefTable()
636 self.read_xref_table(xref_section_offset=self.last_xref_section_offset)
637 if b"Prev" in self.trailer_dict:
638 self.read_prev_trailer(self.trailer_dict[b"Prev"])
640 def read_prev_trailer(self, xref_section_offset):
641 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
642 m = self.re_trailer_prev.search(
643 self.buf[trailer_offset : trailer_offset + 16384]
644 )
645 check_format_condition(m, "previous trailer not found")
646 trailer_data = m.group(1)
647 check_format_condition(
648 int(m.group(2)) == xref_section_offset,
649 "xref section offset in previous trailer doesn't match what was expected",
650 )
651 trailer_dict = self.interpret_trailer(trailer_data)
652 if b"Prev" in trailer_dict:
653 self.read_prev_trailer(trailer_dict[b"Prev"])
655 re_whitespace_optional = re.compile(whitespace_optional)
656 re_name = re.compile(
657 whitespace_optional
658 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?="
659 + delimiter_or_ws
660 + rb")"
661 )
662 re_dict_start = re.compile(whitespace_optional + rb"<<")
663 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional)
665 @classmethod
666 def interpret_trailer(cls, trailer_data):
667 trailer = {}
668 offset = 0
669 while True:
670 m = cls.re_name.match(trailer_data, offset)
671 if not m:
672 m = cls.re_dict_end.match(trailer_data, offset)
673 check_format_condition(
674 m and m.end() == len(trailer_data),
675 "name not found in trailer, remaining data: "
676 + repr(trailer_data[offset:]),
677 )
678 break
679 key = cls.interpret_name(m.group(1))
680 value, offset = cls.get_value(trailer_data, m.end())
681 trailer[key] = value
682 check_format_condition(
683 b"Size" in trailer and isinstance(trailer[b"Size"], int),
684 "/Size not in trailer or not an integer",
685 )
686 check_format_condition(
687 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference),
688 "/Root not in trailer or not an indirect reference",
689 )
690 return trailer
692 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?")
694 @classmethod
695 def interpret_name(cls, raw, as_text=False):
696 name = b""
697 for m in cls.re_hashes_in_name.finditer(raw):
698 if m.group(3):
699 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii"))
700 else:
701 name += m.group(1)
702 if as_text:
703 return name.decode("utf-8")
704 else:
705 return bytes(name)
707 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")")
708 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")")
709 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")")
710 re_int = re.compile(
711 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")"
712 )
713 re_real = re.compile(
714 whitespace_optional
715 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?="
716 + delimiter_or_ws
717 + rb")"
718 )
719 re_array_start = re.compile(whitespace_optional + rb"\[")
720 re_array_end = re.compile(whitespace_optional + rb"]")
721 re_string_hex = re.compile(
722 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>"
723 )
724 re_string_lit = re.compile(whitespace_optional + rb"\(")
725 re_indirect_reference = re.compile(
726 whitespace_optional
727 + rb"([-+]?[0-9]+)"
728 + whitespace_mandatory
729 + rb"([-+]?[0-9]+)"
730 + whitespace_mandatory
731 + rb"R(?="
732 + delimiter_or_ws
733 + rb")"
734 )
735 re_indirect_def_start = re.compile(
736 whitespace_optional
737 + rb"([-+]?[0-9]+)"
738 + whitespace_mandatory
739 + rb"([-+]?[0-9]+)"
740 + whitespace_mandatory
741 + rb"obj(?="
742 + delimiter_or_ws
743 + rb")"
744 )
745 re_indirect_def_end = re.compile(
746 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")"
747 )
748 re_comment = re.compile(
749 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*"
750 )
751 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n")
752 re_stream_end = re.compile(
753 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")"
754 )
756 @classmethod
757 def get_value(cls, data, offset, expect_indirect=None, max_nesting=-1):
758 if max_nesting == 0:
759 return None, None
760 m = cls.re_comment.match(data, offset)
761 if m:
762 offset = m.end()
763 m = cls.re_indirect_def_start.match(data, offset)
764 if m:
765 check_format_condition(
766 int(m.group(1)) > 0,
767 "indirect object definition: object ID must be greater than 0",
768 )
769 check_format_condition(
770 int(m.group(2)) >= 0,
771 "indirect object definition: generation must be non-negative",
772 )
773 check_format_condition(
774 expect_indirect is None
775 or expect_indirect
776 == IndirectReference(int(m.group(1)), int(m.group(2))),
777 "indirect object definition different than expected",
778 )
779 object, offset = cls.get_value(data, m.end(), max_nesting=max_nesting - 1)
780 if offset is None:
781 return object, None
782 m = cls.re_indirect_def_end.match(data, offset)
783 check_format_condition(m, "indirect object definition end not found")
784 return object, m.end()
785 check_format_condition(
786 not expect_indirect, "indirect object definition not found"
787 )
788 m = cls.re_indirect_reference.match(data, offset)
789 if m:
790 check_format_condition(
791 int(m.group(1)) > 0,
792 "indirect object reference: object ID must be greater than 0",
793 )
794 check_format_condition(
795 int(m.group(2)) >= 0,
796 "indirect object reference: generation must be non-negative",
797 )
798 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end()
799 m = cls.re_dict_start.match(data, offset)
800 if m:
801 offset = m.end()
802 result = {}
803 m = cls.re_dict_end.match(data, offset)
804 while not m:
805 key, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1)
806 if offset is None:
807 return result, None
808 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1)
809 result[key] = value
810 if offset is None:
811 return result, None
812 m = cls.re_dict_end.match(data, offset)
813 offset = m.end()
814 m = cls.re_stream_start.match(data, offset)
815 if m:
816 try:
817 stream_len = int(result[b"Length"])
818 except (TypeError, KeyError, ValueError) as e:
819 raise PdfFormatError(
820 "bad or missing Length in stream dict (%r)"
821 % result.get(b"Length", None)
822 ) from e
823 stream_data = data[m.end() : m.end() + stream_len]
824 m = cls.re_stream_end.match(data, m.end() + stream_len)
825 check_format_condition(m, "stream end not found")
826 offset = m.end()
827 result = PdfStream(PdfDict(result), stream_data)
828 else:
829 result = PdfDict(result)
830 return result, offset
831 m = cls.re_array_start.match(data, offset)
832 if m:
833 offset = m.end()
834 result = []
835 m = cls.re_array_end.match(data, offset)
836 while not m:
837 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1)
838 result.append(value)
839 if offset is None:
840 return result, None
841 m = cls.re_array_end.match(data, offset)
842 return result, m.end()
843 m = cls.re_null.match(data, offset)
844 if m:
845 return None, m.end()
846 m = cls.re_true.match(data, offset)
847 if m:
848 return True, m.end()
849 m = cls.re_false.match(data, offset)
850 if m:
851 return False, m.end()
852 m = cls.re_name.match(data, offset)
853 if m:
854 return PdfName(cls.interpret_name(m.group(1))), m.end()
855 m = cls.re_int.match(data, offset)
856 if m:
857 return int(m.group(1)), m.end()
858 m = cls.re_real.match(data, offset)
859 if m:
860 # XXX Decimal instead of float???
861 return float(m.group(1)), m.end()
862 m = cls.re_string_hex.match(data, offset)
863 if m:
864 # filter out whitespace
865 hex_string = bytearray(
866 b for b in m.group(1) if b in b"0123456789abcdefABCDEF"
867 )
868 if len(hex_string) % 2 == 1:
869 # append a 0 if the length is not even - yes, at the end
870 hex_string.append(ord(b"0"))
871 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end()
872 m = cls.re_string_lit.match(data, offset)
873 if m:
874 return cls.get_literal_string(data, m.end())
875 # return None, offset # fallback (only for debugging)
876 raise PdfFormatError("unrecognized object: " + repr(data[offset : offset + 32]))
878 re_lit_str_token = re.compile(
879 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))"
880 )
881 escaped_chars = {
882 b"n": b"\n",
883 b"r": b"\r",
884 b"t": b"\t",
885 b"b": b"\b",
886 b"f": b"\f",
887 b"(": b"(",
888 b")": b")",
889 b"\\": b"\\",
890 ord(b"n"): b"\n",
891 ord(b"r"): b"\r",
892 ord(b"t"): b"\t",
893 ord(b"b"): b"\b",
894 ord(b"f"): b"\f",
895 ord(b"("): b"(",
896 ord(b")"): b")",
897 ord(b"\\"): b"\\",
898 }
900 @classmethod
901 def get_literal_string(cls, data, offset):
902 nesting_depth = 0
903 result = bytearray()
904 for m in cls.re_lit_str_token.finditer(data, offset):
905 result.extend(data[offset : m.start()])
906 if m.group(1):
907 result.extend(cls.escaped_chars[m.group(1)[1]])
908 elif m.group(2):
909 result.append(int(m.group(2)[1:], 8))
910 elif m.group(3):
911 pass
912 elif m.group(5):
913 result.extend(b"\n")
914 elif m.group(6):
915 result.extend(b"(")
916 nesting_depth += 1
917 elif m.group(7):
918 if nesting_depth == 0:
919 return bytes(result), m.end()
920 result.extend(b")")
921 nesting_depth -= 1
922 offset = m.end()
923 raise PdfFormatError("unfinished literal string")
925 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline)
926 re_xref_subsection_start = re.compile(
927 whitespace_optional
928 + rb"([0-9]+)"
929 + whitespace_mandatory
930 + rb"([0-9]+)"
931 + whitespace_optional
932 + newline_only
933 )
934 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
936 def read_xref_table(self, xref_section_offset):
937 subsection_found = False
938 m = self.re_xref_section_start.match(
939 self.buf, xref_section_offset + self.start_offset
940 )
941 check_format_condition(m, "xref section start not found")
942 offset = m.end()
943 while True:
944 m = self.re_xref_subsection_start.match(self.buf, offset)
945 if not m:
946 check_format_condition(
947 subsection_found, "xref subsection start not found"
948 )
949 break
950 subsection_found = True
951 offset = m.end()
952 first_object = int(m.group(1))
953 num_objects = int(m.group(2))
954 for i in range(first_object, first_object + num_objects):
955 m = self.re_xref_entry.match(self.buf, offset)
956 check_format_condition(m, "xref entry not found")
957 offset = m.end()
958 is_free = m.group(3) == b"f"
959 generation = int(m.group(2))
960 if not is_free:
961 new_entry = (int(m.group(1)), generation)
962 check_format_condition(
963 i not in self.xref_table or self.xref_table[i] == new_entry,
964 "xref entry duplicated (and not identical)",
965 )
966 self.xref_table[i] = new_entry
967 return offset
969 def read_indirect(self, ref, max_nesting=-1):
970 offset, generation = self.xref_table[ref[0]]
971 check_format_condition(
972 generation == ref[1],
973 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref "
974 f"table, instead found generation {generation} at offset {offset}",
975 )
976 value = self.get_value(
977 self.buf,
978 offset + self.start_offset,
979 expect_indirect=IndirectReference(*ref),
980 max_nesting=max_nesting,
981 )[0]
982 self.cached_objects[ref] = value
983 return value
985 def linearize_page_tree(self, node=None):
986 if node is None:
987 node = self.page_tree_root
988 check_format_condition(
989 node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages"
990 )
991 pages = []
992 for kid in node[b"Kids"]:
993 kid_object = self.read_indirect(kid)
994 if kid_object[b"Type"] == b"Page":
995 pages.append(kid)
996 else:
997 pages.extend(self.linearize_page_tree(node=kid_object))
998 return pages