Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/xlrd/compdoc.py: 7%
318 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# -*- coding: utf-8 -*-
2# Copyright (c) 2005-2012 Stephen John Machin, Lingfo Pty Ltd
3# This module is part of the xlrd package, which is released under a
4# BSD-style licence.
5# No part of the content of this file was derived from the works of
6# David Giffin.
7"""
8Implements the minimal functionality required
9to extract a "Workbook" or "Book" stream (as one big string)
10from an OLE2 Compound Document file.
11"""
12from __future__ import print_function
14import array
15import sys
16from struct import unpack
18from .timemachine import *
20#: Magic cookie that should appear in the first 8 bytes of the file.
21SIGNATURE = b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1"
23EOCSID = -2
24FREESID = -1
25SATSID = -3
26MSATSID = -4
27EVILSID = -5
29class CompDocError(Exception):
30 pass
32class DirNode(object):
34 def __init__(self, DID, dent, DEBUG=0, logfile=sys.stdout):
35 # dent is the 128-byte directory entry
36 self.DID = DID
37 self.logfile = logfile
38 (cbufsize, self.etype, self.colour, self.left_DID, self.right_DID,
39 self.root_DID) = \
40 unpack('<HBBiii', dent[64:80])
41 (self.first_SID, self.tot_size) = \
42 unpack('<ii', dent[116:124])
43 if cbufsize == 0:
44 self.name = UNICODE_LITERAL('')
45 else:
46 self.name = unicode(dent[0:cbufsize-2], 'utf_16_le') # omit the trailing U+0000
47 self.children = [] # filled in later
48 self.parent = -1 # indicates orphan; fixed up later
49 self.tsinfo = unpack('<IIII', dent[100:116])
50 if DEBUG:
51 self.dump(DEBUG)
53 def dump(self, DEBUG=1):
54 fprintf(
55 self.logfile,
56 "DID=%d name=%r etype=%d DIDs(left=%d right=%d root=%d parent=%d kids=%r) first_SID=%d tot_size=%d\n",
57 self.DID, self.name, self.etype, self.left_DID,
58 self.right_DID, self.root_DID, self.parent, self.children, self.first_SID, self.tot_size
59 )
60 if DEBUG == 2:
61 # cre_lo, cre_hi, mod_lo, mod_hi = tsinfo
62 print("timestamp info", self.tsinfo, file=self.logfile)
64def _build_family_tree(dirlist, parent_DID, child_DID):
65 if child_DID < 0: return
66 _build_family_tree(dirlist, parent_DID, dirlist[child_DID].left_DID)
67 dirlist[parent_DID].children.append(child_DID)
68 dirlist[child_DID].parent = parent_DID
69 _build_family_tree(dirlist, parent_DID, dirlist[child_DID].right_DID)
70 if dirlist[child_DID].etype == 1: # storage
71 _build_family_tree(dirlist, child_DID, dirlist[child_DID].root_DID)
74class CompDoc(object):
75 """
76 Compound document handler.
78 :param mem:
79 The raw contents of the file, as a string, or as an :class:`mmap.mmap`
80 object. The only operation it needs to support is slicing.
81 """
84 def __init__(self, mem, logfile=sys.stdout, DEBUG=0, ignore_workbook_corruption=False):
85 self.logfile = logfile
86 self.ignore_workbook_corruption = ignore_workbook_corruption
87 self.DEBUG = DEBUG
88 if mem[0:8] != SIGNATURE:
89 raise CompDocError('Not an OLE2 compound document')
90 if mem[28:30] != b'\xFE\xFF':
91 raise CompDocError('Expected "little-endian" marker, found %r' % mem[28:30])
92 revision, version = unpack('<HH', mem[24:28])
93 if DEBUG:
94 print("\nCompDoc format: version=0x%04x revision=0x%04x" % (version, revision), file=logfile)
95 self.mem = mem
96 ssz, sssz = unpack('<HH', mem[30:34])
97 if ssz > 20: # allows for 2**20 bytes i.e. 1MB
98 print("WARNING: sector size (2**%d) is preposterous; assuming 512 and continuing ..."
99 % ssz, file=logfile)
100 ssz = 9
101 if sssz > ssz:
102 print("WARNING: short stream sector size (2**%d) is preposterous; assuming 64 and continuing ..."
103 % sssz, file=logfile)
104 sssz = 6
105 self.sec_size = sec_size = 1 << ssz
106 self.short_sec_size = 1 << sssz
107 if self.sec_size != 512 or self.short_sec_size != 64:
108 print("@@@@ sec_size=%d short_sec_size=%d" % (self.sec_size, self.short_sec_size), file=logfile)
109 (
110 SAT_tot_secs, self.dir_first_sec_sid, _unused, self.min_size_std_stream,
111 SSAT_first_sec_sid, SSAT_tot_secs,
112 MSATX_first_sec_sid, MSATX_tot_secs,
113 ) = unpack('<iiiiiiii', mem[44:76])
114 mem_data_len = len(mem) - 512
115 mem_data_secs, left_over = divmod(mem_data_len, sec_size)
116 if left_over:
117 #### raise CompDocError("Not a whole number of sectors")
118 mem_data_secs += 1
119 print("WARNING *** file size (%d) not 512 + multiple of sector size (%d)"
120 % (len(mem), sec_size), file=logfile)
121 self.mem_data_secs = mem_data_secs # use for checking later
122 self.mem_data_len = mem_data_len
123 seen = self.seen = array.array('B', [0]) * mem_data_secs
125 if DEBUG:
126 print('sec sizes', ssz, sssz, sec_size, self.short_sec_size, file=logfile)
127 print("mem data: %d bytes == %d sectors" % (mem_data_len, mem_data_secs), file=logfile)
128 print("SAT_tot_secs=%d, dir_first_sec_sid=%d, min_size_std_stream=%d"
129 % (SAT_tot_secs, self.dir_first_sec_sid, self.min_size_std_stream,), file=logfile)
130 print("SSAT_first_sec_sid=%d, SSAT_tot_secs=%d" % (SSAT_first_sec_sid, SSAT_tot_secs,), file=logfile)
131 print("MSATX_first_sec_sid=%d, MSATX_tot_secs=%d" % (MSATX_first_sec_sid, MSATX_tot_secs,), file=logfile)
132 nent = sec_size // 4 # number of SID entries in a sector
133 fmt = "<%di" % nent
134 trunc_warned = 0
135 #
136 # === build the MSAT ===
137 #
138 MSAT = list(unpack('<109i', mem[76:512]))
139 SAT_sectors_reqd = (mem_data_secs + nent - 1) // nent
140 expected_MSATX_sectors = max(0, (SAT_sectors_reqd - 109 + nent - 2) // (nent - 1))
141 actual_MSATX_sectors = 0
142 if MSATX_tot_secs == 0 and MSATX_first_sec_sid in (EOCSID, FREESID, 0):
143 # Strictly, if there is no MSAT extension, then MSATX_first_sec_sid
144 # should be set to EOCSID ... FREESID and 0 have been met in the wild.
145 pass # Presuming no extension
146 else:
147 sid = MSATX_first_sec_sid
148 while sid not in (EOCSID, FREESID, MSATSID):
149 # Above should be only EOCSID according to MS & OOo docs
150 # but Excel doesn't complain about FREESID. Zero is a valid
151 # sector number, not a sentinel.
152 if DEBUG > 1:
153 print('MSATX: sid=%d (0x%08X)' % (sid, sid), file=logfile)
154 if sid >= mem_data_secs:
155 msg = "MSAT extension: accessing sector %d but only %d in file" % (sid, mem_data_secs)
156 if DEBUG > 1:
157 print(msg, file=logfile)
158 break
159 raise CompDocError(msg)
160 elif sid < 0:
161 raise CompDocError("MSAT extension: invalid sector id: %d" % sid)
162 if seen[sid]:
163 raise CompDocError("MSAT corruption: seen[%d] == %d" % (sid, seen[sid]))
164 seen[sid] = 1
165 actual_MSATX_sectors += 1
166 if DEBUG and actual_MSATX_sectors > expected_MSATX_sectors:
167 print("[1]===>>>", mem_data_secs, nent, SAT_sectors_reqd, expected_MSATX_sectors, actual_MSATX_sectors, file=logfile)
168 offset = 512 + sec_size * sid
169 MSAT.extend(unpack(fmt, mem[offset:offset+sec_size]))
170 sid = MSAT.pop() # last sector id is sid of next sector in the chain
172 if DEBUG and actual_MSATX_sectors != expected_MSATX_sectors:
173 print("[2]===>>>", mem_data_secs, nent, SAT_sectors_reqd, expected_MSATX_sectors, actual_MSATX_sectors, file=logfile)
174 if DEBUG:
175 print("MSAT: len =", len(MSAT), file=logfile)
176 dump_list(MSAT, 10, logfile)
177 #
178 # === build the SAT ===
179 #
180 self.SAT = []
181 actual_SAT_sectors = 0
182 dump_again = 0
183 for msidx in xrange(len(MSAT)):
184 msid = MSAT[msidx]
185 if msid in (FREESID, EOCSID):
186 # Specification: the MSAT array may be padded with trailing FREESID entries.
187 # Toleration: a FREESID or EOCSID entry anywhere in the MSAT array will be ignored.
188 continue
189 if msid >= mem_data_secs:
190 if not trunc_warned:
191 print("WARNING *** File is truncated, or OLE2 MSAT is corrupt!!", file=logfile)
192 print("INFO: Trying to access sector %d but only %d available"
193 % (msid, mem_data_secs), file=logfile)
194 trunc_warned = 1
195 MSAT[msidx] = EVILSID
196 dump_again = 1
197 continue
198 elif msid < -2:
199 raise CompDocError("MSAT: invalid sector id: %d" % msid)
200 if seen[msid]:
201 raise CompDocError("MSAT extension corruption: seen[%d] == %d" % (msid, seen[msid]))
202 seen[msid] = 2
203 actual_SAT_sectors += 1
204 if DEBUG and actual_SAT_sectors > SAT_sectors_reqd:
205 print("[3]===>>>", mem_data_secs, nent, SAT_sectors_reqd, expected_MSATX_sectors, actual_MSATX_sectors, actual_SAT_sectors, msid, file=logfile)
206 offset = 512 + sec_size * msid
207 self.SAT.extend(unpack(fmt, mem[offset:offset+sec_size]))
209 if DEBUG:
210 print("SAT: len =", len(self.SAT), file=logfile)
211 dump_list(self.SAT, 10, logfile)
212 # print >> logfile, "SAT ",
213 # for i, s in enumerate(self.SAT):
214 # print >> logfile, "entry: %4d offset: %6d, next entry: %4d" % (i, 512 + sec_size * i, s)
215 # print >> logfile, "%d:%d " % (i, s),
216 print(file=logfile)
217 if DEBUG and dump_again:
218 print("MSAT: len =", len(MSAT), file=logfile)
219 dump_list(MSAT, 10, logfile)
220 for satx in xrange(mem_data_secs, len(self.SAT)):
221 self.SAT[satx] = EVILSID
222 print("SAT: len =", len(self.SAT), file=logfile)
223 dump_list(self.SAT, 10, logfile)
224 #
225 # === build the directory ===
226 #
227 dbytes = self._get_stream(
228 self.mem, 512, self.SAT, self.sec_size, self.dir_first_sec_sid,
229 name="directory", seen_id=3)
230 dirlist = []
231 did = -1
232 for pos in xrange(0, len(dbytes), 128):
233 did += 1
234 dirlist.append(DirNode(did, dbytes[pos:pos+128], 0, logfile))
235 self.dirlist = dirlist
236 _build_family_tree(dirlist, 0, dirlist[0].root_DID) # and stand well back ...
237 if DEBUG:
238 for d in dirlist:
239 d.dump(DEBUG)
240 #
241 # === get the SSCS ===
242 #
243 sscs_dir = self.dirlist[0]
244 assert sscs_dir.etype == 5 # root entry
245 if sscs_dir.first_SID < 0 or sscs_dir.tot_size == 0:
246 # Problem reported by Frank Hoffsuemmer: some software was
247 # writing -1 instead of -2 (EOCSID) for the first_SID
248 # when the SCCS was empty. Not having EOCSID caused assertion
249 # failure in _get_stream.
250 # Solution: avoid calling _get_stream in any case when the
251 # SCSS appears to be empty.
252 self.SSCS = ""
253 else:
254 self.SSCS = self._get_stream(
255 self.mem, 512, self.SAT, sec_size, sscs_dir.first_SID,
256 sscs_dir.tot_size, name="SSCS", seen_id=4)
257 # if DEBUG: print >> logfile, "SSCS", repr(self.SSCS)
258 #
259 # === build the SSAT ===
260 #
261 self.SSAT = []
262 if SSAT_tot_secs > 0 and sscs_dir.tot_size == 0:
263 print("WARNING *** OLE2 inconsistency: SSCS size is 0 but SSAT size is non-zero", file=logfile)
264 if sscs_dir.tot_size > 0:
265 sid = SSAT_first_sec_sid
266 nsecs = SSAT_tot_secs
267 while sid >= 0 and nsecs > 0:
268 if seen[sid]:
269 raise CompDocError("SSAT corruption: seen[%d] == %d" % (sid, seen[sid]))
270 seen[sid] = 5
271 nsecs -= 1
272 start_pos = 512 + sid * sec_size
273 news = list(unpack(fmt, mem[start_pos:start_pos+sec_size]))
274 self.SSAT.extend(news)
275 sid = self.SAT[sid]
276 if DEBUG: print("SSAT last sid %d; remaining sectors %d" % (sid, nsecs), file=logfile)
277 assert nsecs == 0 and sid == EOCSID
278 if DEBUG:
279 print("SSAT", file=logfile)
280 dump_list(self.SSAT, 10, logfile)
281 if DEBUG:
282 print("seen", file=logfile)
283 dump_list(seen, 20, logfile)
285 def _get_stream(self, mem, base, sat, sec_size, start_sid, size=None, name='', seen_id=None):
286 # print >> self.logfile, "_get_stream", base, sec_size, start_sid, size
287 sectors = []
288 s = start_sid
289 if size is None:
290 # nothing to check against
291 while s >= 0:
292 if seen_id is not None:
293 if self.seen[s]:
294 raise CompDocError("%s corruption: seen[%d] == %d" % (name, s, self.seen[s]))
295 self.seen[s] = seen_id
296 start_pos = base + s * sec_size
297 sectors.append(mem[start_pos:start_pos+sec_size])
298 try:
299 s = sat[s]
300 except IndexError:
301 raise CompDocError(
302 "OLE2 stream %r: sector allocation table invalid entry (%d)" %
303 (name, s)
304 )
305 assert s == EOCSID
306 else:
307 todo = size
308 while s >= 0:
309 if seen_id is not None:
310 if self.seen[s]:
311 raise CompDocError("%s corruption: seen[%d] == %d" % (name, s, self.seen[s]))
312 self.seen[s] = seen_id
313 start_pos = base + s * sec_size
314 grab = sec_size
315 if grab > todo:
316 grab = todo
317 todo -= grab
318 sectors.append(mem[start_pos:start_pos+grab])
319 try:
320 s = sat[s]
321 except IndexError:
322 raise CompDocError(
323 "OLE2 stream %r: sector allocation table invalid entry (%d)" %
324 (name, s)
325 )
326 assert s == EOCSID
327 if todo != 0:
328 fprintf(self.logfile,
329 "WARNING *** OLE2 stream %r: expected size %d, actual size %d\n",
330 name, size, size - todo)
332 return b''.join(sectors)
334 def _dir_search(self, path, storage_DID=0):
335 # Return matching DirNode instance, or None
336 head = path[0]
337 tail = path[1:]
338 dl = self.dirlist
339 for child in dl[storage_DID].children:
340 if dl[child].name.lower() == head.lower():
341 et = dl[child].etype
342 if et == 2:
343 return dl[child]
344 if et == 1:
345 if not tail:
346 raise CompDocError("Requested component is a 'storage'")
347 return self._dir_search(tail, child)
348 dl[child].dump(1)
349 raise CompDocError("Requested stream is not a 'user stream'")
350 return None
353 def get_named_stream(self, qname):
354 """
355 Interrogate the compound document's directory; return the stream as a
356 string if found, otherwise return ``None``.
358 :param qname:
359 Name of the desired stream e.g. ``'Workbook'``.
360 Should be in Unicode or convertible thereto.
361 """
362 d = self._dir_search(qname.split("/"))
363 if d is None:
364 return None
365 if d.tot_size >= self.min_size_std_stream:
366 return self._get_stream(
367 self.mem, 512, self.SAT, self.sec_size, d.first_SID,
368 d.tot_size, name=qname, seen_id=d.DID+6)
369 else:
370 return self._get_stream(
371 self.SSCS, 0, self.SSAT, self.short_sec_size, d.first_SID,
372 d.tot_size, name=qname + " (from SSCS)", seen_id=None)
374 def locate_named_stream(self, qname):
375 """
376 Interrogate the compound document's directory.
378 If the named stream is not found, ``(None, 0, 0)`` will be returned.
380 If the named stream is found and is contiguous within the original
381 byte sequence (``mem``) used when the document was opened,
382 then ``(mem, offset_to_start_of_stream, length_of_stream)`` is returned.
384 Otherwise a new string is built from the fragments and
385 ``(new_string, 0, length_of_stream)`` is returned.
387 :param qname:
388 Name of the desired stream e.g. ``'Workbook'``.
389 Should be in Unicode or convertible thereto.
390 """
391 d = self._dir_search(qname.split("/"))
392 if d is None:
393 return (None, 0, 0)
394 if d.tot_size > self.mem_data_len:
395 raise CompDocError("%r stream length (%d bytes) > file data size (%d bytes)"
396 % (qname, d.tot_size, self.mem_data_len))
397 if d.tot_size >= self.min_size_std_stream:
398 result = self._locate_stream(
399 self.mem, 512, self.SAT, self.sec_size, d.first_SID,
400 d.tot_size, qname, d.DID+6)
401 if self.DEBUG:
402 print("\nseen", file=self.logfile)
403 dump_list(self.seen, 20, self.logfile)
404 return result
405 else:
406 return (
407 self._get_stream(
408 self.SSCS, 0, self.SSAT, self.short_sec_size, d.first_SID,
409 d.tot_size, qname + " (from SSCS)", None),
410 0,
411 d.tot_size,
412 )
414 def _locate_stream(self, mem, base, sat, sec_size, start_sid, expected_stream_size, qname, seen_id):
415 # print >> self.logfile, "_locate_stream", base, sec_size, start_sid, expected_stream_size
416 s = start_sid
417 if s < 0:
418 raise CompDocError("_locate_stream: start_sid (%d) is -ve" % start_sid)
419 p = -99 # dummy previous SID
420 start_pos = -9999
421 end_pos = -8888
422 slices = []
423 tot_found = 0
424 found_limit = (expected_stream_size + sec_size - 1) // sec_size
425 while s >= 0:
426 if self.seen[s]:
427 if not self.ignore_workbook_corruption:
428 print("_locate_stream(%s): seen" % qname, file=self.logfile); dump_list(self.seen, 20, self.logfile)
429 raise CompDocError("%s corruption: seen[%d] == %d" % (qname, s, self.seen[s]))
430 self.seen[s] = seen_id
431 tot_found += 1
432 if tot_found > found_limit:
433 # Note: expected size rounded up to higher sector
434 raise CompDocError(
435 "%s: size exceeds expected %d bytes; corrupt?"
436 % (qname, found_limit * sec_size)
437 )
438 if s == p+1:
439 # contiguous sectors
440 end_pos += sec_size
441 else:
442 # start new slice
443 if p >= 0:
444 # not first time
445 slices.append((start_pos, end_pos))
446 start_pos = base + s * sec_size
447 end_pos = start_pos + sec_size
448 p = s
449 s = sat[s]
450 assert s == EOCSID
451 assert tot_found == found_limit
452 # print >> self.logfile, "_locate_stream(%s): seen" % qname; dump_list(self.seen, 20, self.logfile)
453 if not slices:
454 # The stream is contiguous ... just what we like!
455 return (mem, start_pos, expected_stream_size)
456 slices.append((start_pos, end_pos))
457 # print >> self.logfile, "+++>>> %d fragments" % len(slices)
458 return (b''.join(mem[start_pos:end_pos] for start_pos, end_pos in slices), 0, expected_stream_size)
460# ==========================================================================================
461def x_dump_line(alist, stride, f, dpos, equal=0):
462 print("%5d%s" % (dpos, " ="[equal]), end=' ', file=f)
463 for value in alist[dpos:dpos + stride]:
464 print(str(value), end=' ', file=f)
465 print(file=f)
467def dump_list(alist, stride, f=sys.stdout):
468 def _dump_line(dpos, equal=0):
469 print("%5d%s" % (dpos, " ="[equal]), end=' ', file=f)
470 for value in alist[dpos:dpos + stride]:
471 print(str(value), end=' ', file=f)
472 print(file=f)
473 pos = None
474 oldpos = None
475 for pos in xrange(0, len(alist), stride):
476 if oldpos is None:
477 _dump_line(pos)
478 oldpos = pos
479 elif alist[pos:pos+stride] != alist[oldpos:oldpos+stride]:
480 if pos - oldpos > stride:
481 _dump_line(pos - stride, equal=1)
482 _dump_line(pos)
483 oldpos = pos
484 if oldpos is not None and pos is not None and pos != oldpos:
485 _dump_line(pos, equal=1)