Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/PIL/ImageFile.py: 26%

351 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29 

30import io 

31import itertools 

32import struct 

33import sys 

34 

35from . import Image 

36from ._util import is_path 

37 

38MAXBLOCK = 65536 

39 

40SAFEBLOCK = 1024 * 1024 

41 

42LOAD_TRUNCATED_IMAGES = False 

43"""Whether or not to load truncated image files. User code may change this.""" 

44 

45ERRORS = { 

46 -1: "image buffer overrun error", 

47 -2: "decoding error", 

48 -3: "unknown error", 

49 -8: "bad configuration", 

50 -9: "out of memory error", 

51} 

52""" 

53Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

54:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

55:meth:`.PyEncoder.encode_to_file`. 

56""" 

57 

58 

59# 

60# -------------------------------------------------------------------- 

61# Helpers 

62 

63 

64def raise_oserror(error): 

65 try: 

66 message = Image.core.getcodecstatus(error) 

67 except AttributeError: 

68 message = ERRORS.get(error) 

69 if not message: 

70 message = f"decoder error {error}" 

71 raise OSError(message + " when reading image file") 

72 

73 

74def _tilesort(t): 

75 # sort on offset 

76 return t[2] 

77 

78 

79# 

80# -------------------------------------------------------------------- 

81# ImageFile base class 

82 

83 

84class ImageFile(Image.Image): 

85 """Base class for image file format handlers.""" 

86 

87 def __init__(self, fp=None, filename=None): 

88 super().__init__() 

89 

90 self._min_frame = 0 

91 

92 self.custom_mimetype = None 

93 

94 self.tile = None 

95 """ A list of tile descriptors, or ``None`` """ 

96 

97 self.readonly = 1 # until we know better 

98 

99 self.decoderconfig = () 

100 self.decodermaxblock = MAXBLOCK 

101 

102 if is_path(fp): 102 ↛ 104line 102 didn't jump to line 104, because the condition on line 102 was never true

103 # filename 

104 self.fp = open(fp, "rb") 

105 self.filename = fp 

106 self._exclusive_fp = True 

107 else: 

108 # stream 

109 self.fp = fp 

110 self.filename = filename 

111 # can be overridden 

112 self._exclusive_fp = None 

113 

114 try: 

115 try: 

116 self._open() 

117 except ( 

118 IndexError, # end of data 

119 TypeError, # end of data (ord) 

120 KeyError, # unsupported mode 

121 EOFError, # got header but not the first frame 

122 struct.error, 

123 ) as v: 

124 raise SyntaxError(v) from v 

125 

126 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 126 ↛ 127line 126 didn't jump to line 127, because the condition on line 126 was never true

127 raise SyntaxError("not identified by this driver") 

128 except BaseException: 

129 # close the file only if we have opened it this constructor 

130 if self._exclusive_fp: 

131 self.fp.close() 

132 raise 

133 

134 def get_format_mimetype(self): 

135 if self.custom_mimetype: 

136 return self.custom_mimetype 

137 if self.format is not None: 

138 return Image.MIME.get(self.format.upper()) 

139 

140 def verify(self): 

141 """Check file integrity""" 

142 

143 # raise exception if something's wrong. must be called 

144 # directly after open, and closes file when finished. 

145 if self._exclusive_fp: 

146 self.fp.close() 

147 self.fp = None 

148 

149 def load(self): 

150 """Load image data based on tile list""" 

151 

152 if self.tile is None: 

153 raise OSError("cannot load this image") 

154 

155 pixel = Image.Image.load(self) 

156 if not self.tile: 

157 return pixel 

158 

159 self.map = None 

160 use_mmap = self.filename and len(self.tile) == 1 

161 # As of pypy 2.1.0, memory mapping was failing here. 

162 use_mmap = use_mmap and not hasattr(sys, "pypy_version_info") 

163 

164 readonly = 0 

165 

166 # look for read/seek overrides 

167 try: 

168 read = self.load_read 

169 # don't use mmap if there are custom read/seek functions 

170 use_mmap = False 

171 except AttributeError: 

172 read = self.fp.read 

173 

174 try: 

175 seek = self.load_seek 

176 use_mmap = False 

177 except AttributeError: 

178 seek = self.fp.seek 

179 

180 if use_mmap: 

181 # try memory mapping 

182 decoder_name, extents, offset, args = self.tile[0] 

183 if ( 

184 decoder_name == "raw" 

185 and len(args) >= 3 

186 and args[0] == self.mode 

187 and args[0] in Image._MAPMODES 

188 ): 

189 try: 

190 # use mmap, if possible 

191 import mmap 

192 

193 with open(self.filename) as fp: 

194 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

195 self.im = Image.core.map_buffer( 

196 self.map, self.size, decoder_name, offset, args 

197 ) 

198 readonly = 1 

199 # After trashing self.im, 

200 # we might need to reload the palette data. 

201 if self.palette: 

202 self.palette.dirty = 1 

203 except (AttributeError, OSError, ImportError): 

204 self.map = None 

205 

206 self.load_prepare() 

207 err_code = -3 # initialize to unknown error 

208 if not self.map: 

209 # sort tiles in file order 

210 self.tile.sort(key=_tilesort) 

211 

212 try: 

213 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

214 prefix = self.tile_prefix 

215 except AttributeError: 

216 prefix = b"" 

217 

218 # Remove consecutive duplicates that only differ by their offset 

219 self.tile = [ 

220 list(tiles)[-1] 

221 for _, tiles in itertools.groupby( 

222 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

223 ) 

224 ] 

225 for decoder_name, extents, offset, args in self.tile: 

226 seek(offset) 

227 decoder = Image._getdecoder( 

228 self.mode, decoder_name, args, self.decoderconfig 

229 ) 

230 try: 

231 decoder.setimage(self.im, extents) 

232 if decoder.pulls_fd: 

233 decoder.setfd(self.fp) 

234 err_code = decoder.decode(b"")[1] 

235 else: 

236 b = prefix 

237 while True: 

238 try: 

239 s = read(self.decodermaxblock) 

240 except (IndexError, struct.error) as e: 

241 # truncated png/gif 

242 if LOAD_TRUNCATED_IMAGES: 

243 break 

244 else: 

245 raise OSError("image file is truncated") from e 

246 

247 if not s: # truncated jpeg 

248 if LOAD_TRUNCATED_IMAGES: 

249 break 

250 else: 

251 raise OSError( 

252 "image file is truncated " 

253 f"({len(b)} bytes not processed)" 

254 ) 

255 

256 b = b + s 

257 n, err_code = decoder.decode(b) 

258 if n < 0: 

259 break 

260 b = b[n:] 

261 finally: 

262 # Need to cleanup here to prevent leaks 

263 decoder.cleanup() 

264 

265 self.tile = [] 

266 self.readonly = readonly 

267 

268 self.load_end() 

269 

270 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

271 self.fp.close() 

272 self.fp = None 

273 

274 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

275 # still raised if decoder fails to return anything 

276 raise_oserror(err_code) 

277 

278 return Image.Image.load(self) 

279 

280 def load_prepare(self): 

281 # create image memory if necessary 

282 if not self.im or self.im.mode != self.mode or self.im.size != self.size: 

283 self.im = Image.core.new(self.mode, self.size) 

284 # create palette (optional) 

285 if self.mode == "P": 

286 Image.Image.load(self) 

287 

288 def load_end(self): 

289 # may be overridden 

290 pass 

291 

292 # may be defined for contained formats 

293 # def load_seek(self, pos): 

294 # pass 

295 

296 # may be defined for blocked formats (e.g. PNG) 

297 # def load_read(self, bytes): 

298 # pass 

299 

300 def _seek_check(self, frame): 

301 if ( 

302 frame < self._min_frame 

303 # Only check upper limit on frames if additional seek operations 

304 # are not required to do so 

305 or ( 

306 not (hasattr(self, "_n_frames") and self._n_frames is None) 

307 and frame >= self.n_frames + self._min_frame 

308 ) 

309 ): 

310 raise EOFError("attempt to seek outside sequence") 

311 

312 return self.tell() != frame 

313 

314 

315class StubImageFile(ImageFile): 

316 """ 

317 Base class for stub image loaders. 

318 

319 A stub loader is an image loader that can identify files of a 

320 certain format, but relies on external code to load the file. 

321 """ 

322 

323 def _open(self): 

324 raise NotImplementedError("StubImageFile subclass must implement _open") 

325 

326 def load(self): 

327 loader = self._load() 

328 if loader is None: 

329 raise OSError(f"cannot find loader for this {self.format} file") 

330 image = loader.load(self) 

331 assert image is not None 

332 # become the other object (!) 

333 self.__class__ = image.__class__ 

334 self.__dict__ = image.__dict__ 

335 return image.load() 

336 

337 def _load(self): 

338 """(Hook) Find actual image loader.""" 

339 raise NotImplementedError("StubImageFile subclass must implement _load") 

340 

341 

342class Parser: 

343 """ 

344 Incremental image parser. This class implements the standard 

345 feed/close consumer interface. 

346 """ 

347 

348 incremental = None 

349 image = None 

350 data = None 

351 decoder = None 

352 offset = 0 

353 finished = 0 

354 

355 def reset(self): 

356 """ 

357 (Consumer) Reset the parser. Note that you can only call this 

358 method immediately after you've created a parser; parser 

359 instances cannot be reused. 

360 """ 

361 assert self.data is None, "cannot reuse parsers" 

362 

363 def feed(self, data): 

364 """ 

365 (Consumer) Feed data to the parser. 

366 

367 :param data: A string buffer. 

368 :exception OSError: If the parser failed to parse the image file. 

369 """ 

370 # collect data 

371 

372 if self.finished: 

373 return 

374 

375 if self.data is None: 

376 self.data = data 

377 else: 

378 self.data = self.data + data 

379 

380 # parse what we have 

381 if self.decoder: 

382 

383 if self.offset > 0: 

384 # skip header 

385 skip = min(len(self.data), self.offset) 

386 self.data = self.data[skip:] 

387 self.offset = self.offset - skip 

388 if self.offset > 0 or not self.data: 

389 return 

390 

391 n, e = self.decoder.decode(self.data) 

392 

393 if n < 0: 

394 # end of stream 

395 self.data = None 

396 self.finished = 1 

397 if e < 0: 

398 # decoding error 

399 self.image = None 

400 raise_oserror(e) 

401 else: 

402 # end of image 

403 return 

404 self.data = self.data[n:] 

405 

406 elif self.image: 

407 

408 # if we end up here with no decoder, this file cannot 

409 # be incrementally parsed. wait until we've gotten all 

410 # available data 

411 pass 

412 

413 else: 

414 

415 # attempt to open this file 

416 try: 

417 with io.BytesIO(self.data) as fp: 

418 im = Image.open(fp) 

419 except OSError: 

420 # traceback.print_exc() 

421 pass # not enough data 

422 else: 

423 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

424 if flag or len(im.tile) != 1: 

425 # custom load code, or multiple tiles 

426 self.decode = None 

427 else: 

428 # initialize decoder 

429 im.load_prepare() 

430 d, e, o, a = im.tile[0] 

431 im.tile = [] 

432 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

433 self.decoder.setimage(im.im, e) 

434 

435 # calculate decoder offset 

436 self.offset = o 

437 if self.offset <= len(self.data): 

438 self.data = self.data[self.offset :] 

439 self.offset = 0 

440 

441 self.image = im 

442 

443 def __enter__(self): 

444 return self 

445 

446 def __exit__(self, *args): 

447 self.close() 

448 

449 def close(self): 

450 """ 

451 (Consumer) Close the stream. 

452 

453 :returns: An image object. 

454 :exception OSError: If the parser failed to parse the image file either 

455 because it cannot be identified or cannot be 

456 decoded. 

457 """ 

458 # finish decoding 

459 if self.decoder: 

460 # get rid of what's left in the buffers 

461 self.feed(b"") 

462 self.data = self.decoder = None 

463 if not self.finished: 

464 raise OSError("image was incomplete") 

465 if not self.image: 

466 raise OSError("cannot parse this image") 

467 if self.data: 

468 # incremental parsing not possible; reopen the file 

469 # not that we have all data 

470 with io.BytesIO(self.data) as fp: 

471 try: 

472 self.image = Image.open(fp) 

473 finally: 

474 self.image.load() 

475 return self.image 

476 

477 

478# -------------------------------------------------------------------- 

479 

480 

481def _save(im, fp, tile, bufsize=0): 

482 """Helper to save image based on tile list 

483 

484 :param im: Image object. 

485 :param fp: File object. 

486 :param tile: Tile list. 

487 :param bufsize: Optional buffer size 

488 """ 

489 

490 im.load() 

491 if not hasattr(im, "encoderconfig"): 491 ↛ 492line 491 didn't jump to line 492, because the condition on line 491 was never true

492 im.encoderconfig = () 

493 tile.sort(key=_tilesort) 

494 # FIXME: make MAXBLOCK a configuration parameter 

495 # It would be great if we could have the encoder specify what it needs 

496 # But, it would need at least the image size in most cases. RawEncode is 

497 # a tricky case. 

498 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

499 try: 

500 fh = fp.fileno() 

501 fp.flush() 

502 exc = None 

503 except (AttributeError, io.UnsupportedOperation) as e: 

504 exc = e 

505 for e, b, o, a in tile: 

506 if o > 0: 506 ↛ 507line 506 didn't jump to line 507, because the condition on line 506 was never true

507 fp.seek(o) 

508 encoder = Image._getencoder(im.mode, e, a, im.encoderconfig) 

509 try: 

510 encoder.setimage(im.im, b) 

511 if encoder.pushes_fd: 511 ↛ 512line 511 didn't jump to line 512, because the condition on line 511 was never true

512 encoder.setfd(fp) 

513 l, s = encoder.encode_to_pyfd() 

514 else: 

515 if exc: 515 ↛ 524line 515 didn't jump to line 524, because the condition on line 515 was never false

516 # compress to Python file-compatible object 

517 while True: 

518 l, s, d = encoder.encode(bufsize) 

519 fp.write(d) 

520 if s: 520 ↛ 518line 520 didn't jump to line 518, because the condition on line 520 was never false

521 break 

522 else: 

523 # slight speedup: compress to real file object 

524 s = encoder.encode_to_file(fh, bufsize) 

525 if s < 0: 525 ↛ 526line 525 didn't jump to line 526, because the condition on line 525 was never true

526 raise OSError(f"encoder error {s} when writing image file") from exc 

527 finally: 

528 encoder.cleanup() 528 ↛ exitline 528 didn't except from function '_save', because the raise on line 526 wasn't executed

529 if hasattr(fp, "flush"): 529 ↛ 530line 529 didn't jump to line 530, because the condition on line 529 was never true

530 fp.flush() 

531 

532 

533def _safe_read(fp, size): 

534 """ 

535 Reads large blocks in a safe way. Unlike fp.read(n), this function 

536 doesn't trust the user. If the requested size is larger than 

537 SAFEBLOCK, the file is read block by block. 

538 

539 :param fp: File handle. Must implement a <b>read</b> method. 

540 :param size: Number of bytes to read. 

541 :returns: A string containing <i>size</i> bytes of data. 

542 

543 Raises an OSError if the file is truncated and the read cannot be completed 

544 

545 """ 

546 if size <= 0: 546 ↛ 547line 546 didn't jump to line 547, because the condition on line 546 was never true

547 return b"" 

548 if size <= SAFEBLOCK: 548 ↛ 553line 548 didn't jump to line 553, because the condition on line 548 was never false

549 data = fp.read(size) 

550 if len(data) < size: 550 ↛ 551line 550 didn't jump to line 551, because the condition on line 550 was never true

551 raise OSError("Truncated File Read") 

552 return data 

553 data = [] 

554 remaining_size = size 

555 while remaining_size > 0: 

556 block = fp.read(min(remaining_size, SAFEBLOCK)) 

557 if not block: 

558 break 

559 data.append(block) 

560 remaining_size -= len(block) 

561 if sum(len(d) for d in data) < size: 

562 raise OSError("Truncated File Read") 

563 return b"".join(data) 

564 

565 

566class PyCodecState: 

567 def __init__(self): 

568 self.xsize = 0 

569 self.ysize = 0 

570 self.xoff = 0 

571 self.yoff = 0 

572 

573 def extents(self): 

574 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

575 

576 

577class PyCodec: 

578 def __init__(self, mode, *args): 

579 self.im = None 

580 self.state = PyCodecState() 

581 self.fd = None 

582 self.mode = mode 

583 self.init(args) 

584 

585 def init(self, args): 

586 """ 

587 Override to perform codec specific initialization 

588 

589 :param args: Array of args items from the tile entry 

590 :returns: None 

591 """ 

592 self.args = args 

593 

594 def cleanup(self): 

595 """ 

596 Override to perform codec specific cleanup 

597 

598 :returns: None 

599 """ 

600 pass 

601 

602 def setfd(self, fd): 

603 """ 

604 Called from ImageFile to set the Python file-like object 

605 

606 :param fd: A Python file-like object 

607 :returns: None 

608 """ 

609 self.fd = fd 

610 

611 def setimage(self, im, extents=None): 

612 """ 

613 Called from ImageFile to set the core output image for the codec 

614 

615 :param im: A core image object 

616 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

617 for this tile 

618 :returns: None 

619 """ 

620 

621 # following c code 

622 self.im = im 

623 

624 if extents: 

625 (x0, y0, x1, y1) = extents 

626 else: 

627 (x0, y0, x1, y1) = (0, 0, 0, 0) 

628 

629 if x0 == 0 and x1 == 0: 

630 self.state.xsize, self.state.ysize = self.im.size 

631 else: 

632 self.state.xoff = x0 

633 self.state.yoff = y0 

634 self.state.xsize = x1 - x0 

635 self.state.ysize = y1 - y0 

636 

637 if self.state.xsize <= 0 or self.state.ysize <= 0: 

638 raise ValueError("Size cannot be negative") 

639 

640 if ( 

641 self.state.xsize + self.state.xoff > self.im.size[0] 

642 or self.state.ysize + self.state.yoff > self.im.size[1] 

643 ): 

644 raise ValueError("Tile cannot extend outside image") 

645 

646 

647class PyDecoder(PyCodec): 

648 """ 

649 Python implementation of a format decoder. Override this class and 

650 add the decoding logic in the :meth:`decode` method. 

651 

652 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

653 """ 

654 

655 _pulls_fd = False 

656 

657 @property 

658 def pulls_fd(self): 

659 return self._pulls_fd 

660 

661 def decode(self, buffer): 

662 """ 

663 Override to perform the decoding process. 

664 

665 :param buffer: A bytes object with the data to be decoded. 

666 :returns: A tuple of ``(bytes consumed, errcode)``. 

667 If finished with decoding return -1 for the bytes consumed. 

668 Err codes are from :data:`.ImageFile.ERRORS`. 

669 """ 

670 raise NotImplementedError() 

671 

672 def set_as_raw(self, data, rawmode=None): 

673 """ 

674 Convenience method to set the internal image from a stream of raw data 

675 

676 :param data: Bytes to be set 

677 :param rawmode: The rawmode to be used for the decoder. 

678 If not specified, it will default to the mode of the image 

679 :returns: None 

680 """ 

681 

682 if not rawmode: 

683 rawmode = self.mode 

684 d = Image._getdecoder(self.mode, "raw", rawmode) 

685 d.setimage(self.im, self.state.extents()) 

686 s = d.decode(data) 

687 

688 if s[0] >= 0: 

689 raise ValueError("not enough image data") 

690 if s[1] != 0: 

691 raise ValueError("cannot decode image data") 

692 

693 

694class PyEncoder(PyCodec): 

695 """ 

696 Python implementation of a format encoder. Override this class and 

697 add the decoding logic in the :meth:`encode` method. 

698 

699 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

700 """ 

701 

702 _pushes_fd = False 

703 

704 @property 

705 def pushes_fd(self): 

706 return self._pushes_fd 

707 

708 def encode(self, bufsize): 

709 """ 

710 Override to perform the encoding process. 

711 

712 :param bufsize: Buffer size. 

713 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

714 If finished with encoding return 1 for the error code. 

715 Err codes are from :data:`.ImageFile.ERRORS`. 

716 """ 

717 raise NotImplementedError() 

718 

719 def encode_to_pyfd(self): 

720 """ 

721 If ``pushes_fd`` is ``True``, then this method will be used, 

722 and ``encode()`` will only be called once. 

723 

724 :returns: A tuple of ``(bytes consumed, errcode)``. 

725 Err codes are from :data:`.ImageFile.ERRORS`. 

726 """ 

727 if not self.pushes_fd: 

728 return 0, -8 # bad configuration 

729 bytes_consumed, errcode, data = self.encode(0) 

730 if data: 

731 self.fd.write(data) 

732 return bytes_consumed, errcode 

733 

734 def encode_to_file(self, fh, bufsize): 

735 """ 

736 :param fh: File handle. 

737 :param bufsize: Buffer size. 

738 

739 :returns: If finished successfully, return 0. 

740 Otherwise, return an error code. Err codes are from 

741 :data:`.ImageFile.ERRORS`. 

742 """ 

743 errcode = 0 

744 while errcode == 0: 

745 status, errcode, buf = self.encode(bufsize) 

746 if status > 0: 

747 fh.write(buf[status:]) 

748 return errcode