Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/xlrd/compdoc.py: 7%

318 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# -*- coding: utf-8 -*- 

2# Copyright (c) 2005-2012 Stephen John Machin, Lingfo Pty Ltd 

3# This module is part of the xlrd package, which is released under a 

4# BSD-style licence. 

5# No part of the content of this file was derived from the works of 

6# David Giffin. 

7""" 

8Implements the minimal functionality required 

9to extract a "Workbook" or "Book" stream (as one big string) 

10from an OLE2 Compound Document file. 

11""" 

12from __future__ import print_function 

13 

14import array 

15import sys 

16from struct import unpack 

17 

18from .timemachine import * 

19 

20#: Magic cookie that should appear in the first 8 bytes of the file. 

21SIGNATURE = b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1" 

22 

23EOCSID = -2 

24FREESID = -1 

25SATSID = -3 

26MSATSID = -4 

27EVILSID = -5 

28 

29class CompDocError(Exception): 

30 pass 

31 

32class DirNode(object): 

33 

34 def __init__(self, DID, dent, DEBUG=0, logfile=sys.stdout): 

35 # dent is the 128-byte directory entry 

36 self.DID = DID 

37 self.logfile = logfile 

38 (cbufsize, self.etype, self.colour, self.left_DID, self.right_DID, 

39 self.root_DID) = \ 

40 unpack('<HBBiii', dent[64:80]) 

41 (self.first_SID, self.tot_size) = \ 

42 unpack('<ii', dent[116:124]) 

43 if cbufsize == 0: 

44 self.name = UNICODE_LITERAL('') 

45 else: 

46 self.name = unicode(dent[0:cbufsize-2], 'utf_16_le') # omit the trailing U+0000 

47 self.children = [] # filled in later 

48 self.parent = -1 # indicates orphan; fixed up later 

49 self.tsinfo = unpack('<IIII', dent[100:116]) 

50 if DEBUG: 

51 self.dump(DEBUG) 

52 

53 def dump(self, DEBUG=1): 

54 fprintf( 

55 self.logfile, 

56 "DID=%d name=%r etype=%d DIDs(left=%d right=%d root=%d parent=%d kids=%r) first_SID=%d tot_size=%d\n", 

57 self.DID, self.name, self.etype, self.left_DID, 

58 self.right_DID, self.root_DID, self.parent, self.children, self.first_SID, self.tot_size 

59 ) 

60 if DEBUG == 2: 

61 # cre_lo, cre_hi, mod_lo, mod_hi = tsinfo 

62 print("timestamp info", self.tsinfo, file=self.logfile) 

63 

64def _build_family_tree(dirlist, parent_DID, child_DID): 

65 if child_DID < 0: return 

66 _build_family_tree(dirlist, parent_DID, dirlist[child_DID].left_DID) 

67 dirlist[parent_DID].children.append(child_DID) 

68 dirlist[child_DID].parent = parent_DID 

69 _build_family_tree(dirlist, parent_DID, dirlist[child_DID].right_DID) 

70 if dirlist[child_DID].etype == 1: # storage 

71 _build_family_tree(dirlist, child_DID, dirlist[child_DID].root_DID) 

72 

73 

74class CompDoc(object): 

75 """ 

76 Compound document handler. 

77 

78 :param mem: 

79 The raw contents of the file, as a string, or as an :class:`mmap.mmap` 

80 object. The only operation it needs to support is slicing. 

81 """ 

82 

83 

84 def __init__(self, mem, logfile=sys.stdout, DEBUG=0, ignore_workbook_corruption=False): 

85 self.logfile = logfile 

86 self.ignore_workbook_corruption = ignore_workbook_corruption 

87 self.DEBUG = DEBUG 

88 if mem[0:8] != SIGNATURE: 

89 raise CompDocError('Not an OLE2 compound document') 

90 if mem[28:30] != b'\xFE\xFF': 

91 raise CompDocError('Expected "little-endian" marker, found %r' % mem[28:30]) 

92 revision, version = unpack('<HH', mem[24:28]) 

93 if DEBUG: 

94 print("\nCompDoc format: version=0x%04x revision=0x%04x" % (version, revision), file=logfile) 

95 self.mem = mem 

96 ssz, sssz = unpack('<HH', mem[30:34]) 

97 if ssz > 20: # allows for 2**20 bytes i.e. 1MB 

98 print("WARNING: sector size (2**%d) is preposterous; assuming 512 and continuing ..." 

99 % ssz, file=logfile) 

100 ssz = 9 

101 if sssz > ssz: 

102 print("WARNING: short stream sector size (2**%d) is preposterous; assuming 64 and continuing ..." 

103 % sssz, file=logfile) 

104 sssz = 6 

105 self.sec_size = sec_size = 1 << ssz 

106 self.short_sec_size = 1 << sssz 

107 if self.sec_size != 512 or self.short_sec_size != 64: 

108 print("@@@@ sec_size=%d short_sec_size=%d" % (self.sec_size, self.short_sec_size), file=logfile) 

109 ( 

110 SAT_tot_secs, self.dir_first_sec_sid, _unused, self.min_size_std_stream, 

111 SSAT_first_sec_sid, SSAT_tot_secs, 

112 MSATX_first_sec_sid, MSATX_tot_secs, 

113 ) = unpack('<iiiiiiii', mem[44:76]) 

114 mem_data_len = len(mem) - 512 

115 mem_data_secs, left_over = divmod(mem_data_len, sec_size) 

116 if left_over: 

117 #### raise CompDocError("Not a whole number of sectors") 

118 mem_data_secs += 1 

119 print("WARNING *** file size (%d) not 512 + multiple of sector size (%d)" 

120 % (len(mem), sec_size), file=logfile) 

121 self.mem_data_secs = mem_data_secs # use for checking later 

122 self.mem_data_len = mem_data_len 

123 seen = self.seen = array.array('B', [0]) * mem_data_secs 

124 

125 if DEBUG: 

126 print('sec sizes', ssz, sssz, sec_size, self.short_sec_size, file=logfile) 

127 print("mem data: %d bytes == %d sectors" % (mem_data_len, mem_data_secs), file=logfile) 

128 print("SAT_tot_secs=%d, dir_first_sec_sid=%d, min_size_std_stream=%d" 

129 % (SAT_tot_secs, self.dir_first_sec_sid, self.min_size_std_stream,), file=logfile) 

130 print("SSAT_first_sec_sid=%d, SSAT_tot_secs=%d" % (SSAT_first_sec_sid, SSAT_tot_secs,), file=logfile) 

131 print("MSATX_first_sec_sid=%d, MSATX_tot_secs=%d" % (MSATX_first_sec_sid, MSATX_tot_secs,), file=logfile) 

132 nent = sec_size // 4 # number of SID entries in a sector 

133 fmt = "<%di" % nent 

134 trunc_warned = 0 

135 # 

136 # === build the MSAT === 

137 # 

138 MSAT = list(unpack('<109i', mem[76:512])) 

139 SAT_sectors_reqd = (mem_data_secs + nent - 1) // nent 

140 expected_MSATX_sectors = max(0, (SAT_sectors_reqd - 109 + nent - 2) // (nent - 1)) 

141 actual_MSATX_sectors = 0 

142 if MSATX_tot_secs == 0 and MSATX_first_sec_sid in (EOCSID, FREESID, 0): 

143 # Strictly, if there is no MSAT extension, then MSATX_first_sec_sid 

144 # should be set to EOCSID ... FREESID and 0 have been met in the wild. 

145 pass # Presuming no extension 

146 else: 

147 sid = MSATX_first_sec_sid 

148 while sid not in (EOCSID, FREESID, MSATSID): 

149 # Above should be only EOCSID according to MS & OOo docs 

150 # but Excel doesn't complain about FREESID. Zero is a valid 

151 # sector number, not a sentinel. 

152 if DEBUG > 1: 

153 print('MSATX: sid=%d (0x%08X)' % (sid, sid), file=logfile) 

154 if sid >= mem_data_secs: 

155 msg = "MSAT extension: accessing sector %d but only %d in file" % (sid, mem_data_secs) 

156 if DEBUG > 1: 

157 print(msg, file=logfile) 

158 break 

159 raise CompDocError(msg) 

160 elif sid < 0: 

161 raise CompDocError("MSAT extension: invalid sector id: %d" % sid) 

162 if seen[sid]: 

163 raise CompDocError("MSAT corruption: seen[%d] == %d" % (sid, seen[sid])) 

164 seen[sid] = 1 

165 actual_MSATX_sectors += 1 

166 if DEBUG and actual_MSATX_sectors > expected_MSATX_sectors: 

167 print("[1]===>>>", mem_data_secs, nent, SAT_sectors_reqd, expected_MSATX_sectors, actual_MSATX_sectors, file=logfile) 

168 offset = 512 + sec_size * sid 

169 MSAT.extend(unpack(fmt, mem[offset:offset+sec_size])) 

170 sid = MSAT.pop() # last sector id is sid of next sector in the chain 

171 

172 if DEBUG and actual_MSATX_sectors != expected_MSATX_sectors: 

173 print("[2]===>>>", mem_data_secs, nent, SAT_sectors_reqd, expected_MSATX_sectors, actual_MSATX_sectors, file=logfile) 

174 if DEBUG: 

175 print("MSAT: len =", len(MSAT), file=logfile) 

176 dump_list(MSAT, 10, logfile) 

177 # 

178 # === build the SAT === 

179 # 

180 self.SAT = [] 

181 actual_SAT_sectors = 0 

182 dump_again = 0 

183 for msidx in xrange(len(MSAT)): 

184 msid = MSAT[msidx] 

185 if msid in (FREESID, EOCSID): 

186 # Specification: the MSAT array may be padded with trailing FREESID entries. 

187 # Toleration: a FREESID or EOCSID entry anywhere in the MSAT array will be ignored. 

188 continue 

189 if msid >= mem_data_secs: 

190 if not trunc_warned: 

191 print("WARNING *** File is truncated, or OLE2 MSAT is corrupt!!", file=logfile) 

192 print("INFO: Trying to access sector %d but only %d available" 

193 % (msid, mem_data_secs), file=logfile) 

194 trunc_warned = 1 

195 MSAT[msidx] = EVILSID 

196 dump_again = 1 

197 continue 

198 elif msid < -2: 

199 raise CompDocError("MSAT: invalid sector id: %d" % msid) 

200 if seen[msid]: 

201 raise CompDocError("MSAT extension corruption: seen[%d] == %d" % (msid, seen[msid])) 

202 seen[msid] = 2 

203 actual_SAT_sectors += 1 

204 if DEBUG and actual_SAT_sectors > SAT_sectors_reqd: 

205 print("[3]===>>>", mem_data_secs, nent, SAT_sectors_reqd, expected_MSATX_sectors, actual_MSATX_sectors, actual_SAT_sectors, msid, file=logfile) 

206 offset = 512 + sec_size * msid 

207 self.SAT.extend(unpack(fmt, mem[offset:offset+sec_size])) 

208 

209 if DEBUG: 

210 print("SAT: len =", len(self.SAT), file=logfile) 

211 dump_list(self.SAT, 10, logfile) 

212 # print >> logfile, "SAT ", 

213 # for i, s in enumerate(self.SAT): 

214 # print >> logfile, "entry: %4d offset: %6d, next entry: %4d" % (i, 512 + sec_size * i, s) 

215 # print >> logfile, "%d:%d " % (i, s), 

216 print(file=logfile) 

217 if DEBUG and dump_again: 

218 print("MSAT: len =", len(MSAT), file=logfile) 

219 dump_list(MSAT, 10, logfile) 

220 for satx in xrange(mem_data_secs, len(self.SAT)): 

221 self.SAT[satx] = EVILSID 

222 print("SAT: len =", len(self.SAT), file=logfile) 

223 dump_list(self.SAT, 10, logfile) 

224 # 

225 # === build the directory === 

226 # 

227 dbytes = self._get_stream( 

228 self.mem, 512, self.SAT, self.sec_size, self.dir_first_sec_sid, 

229 name="directory", seen_id=3) 

230 dirlist = [] 

231 did = -1 

232 for pos in xrange(0, len(dbytes), 128): 

233 did += 1 

234 dirlist.append(DirNode(did, dbytes[pos:pos+128], 0, logfile)) 

235 self.dirlist = dirlist 

236 _build_family_tree(dirlist, 0, dirlist[0].root_DID) # and stand well back ... 

237 if DEBUG: 

238 for d in dirlist: 

239 d.dump(DEBUG) 

240 # 

241 # === get the SSCS === 

242 # 

243 sscs_dir = self.dirlist[0] 

244 assert sscs_dir.etype == 5 # root entry 

245 if sscs_dir.first_SID < 0 or sscs_dir.tot_size == 0: 

246 # Problem reported by Frank Hoffsuemmer: some software was 

247 # writing -1 instead of -2 (EOCSID) for the first_SID 

248 # when the SCCS was empty. Not having EOCSID caused assertion 

249 # failure in _get_stream. 

250 # Solution: avoid calling _get_stream in any case when the 

251 # SCSS appears to be empty. 

252 self.SSCS = "" 

253 else: 

254 self.SSCS = self._get_stream( 

255 self.mem, 512, self.SAT, sec_size, sscs_dir.first_SID, 

256 sscs_dir.tot_size, name="SSCS", seen_id=4) 

257 # if DEBUG: print >> logfile, "SSCS", repr(self.SSCS) 

258 # 

259 # === build the SSAT === 

260 # 

261 self.SSAT = [] 

262 if SSAT_tot_secs > 0 and sscs_dir.tot_size == 0: 

263 print("WARNING *** OLE2 inconsistency: SSCS size is 0 but SSAT size is non-zero", file=logfile) 

264 if sscs_dir.tot_size > 0: 

265 sid = SSAT_first_sec_sid 

266 nsecs = SSAT_tot_secs 

267 while sid >= 0 and nsecs > 0: 

268 if seen[sid]: 

269 raise CompDocError("SSAT corruption: seen[%d] == %d" % (sid, seen[sid])) 

270 seen[sid] = 5 

271 nsecs -= 1 

272 start_pos = 512 + sid * sec_size 

273 news = list(unpack(fmt, mem[start_pos:start_pos+sec_size])) 

274 self.SSAT.extend(news) 

275 sid = self.SAT[sid] 

276 if DEBUG: print("SSAT last sid %d; remaining sectors %d" % (sid, nsecs), file=logfile) 

277 assert nsecs == 0 and sid == EOCSID 

278 if DEBUG: 

279 print("SSAT", file=logfile) 

280 dump_list(self.SSAT, 10, logfile) 

281 if DEBUG: 

282 print("seen", file=logfile) 

283 dump_list(seen, 20, logfile) 

284 

285 def _get_stream(self, mem, base, sat, sec_size, start_sid, size=None, name='', seen_id=None): 

286 # print >> self.logfile, "_get_stream", base, sec_size, start_sid, size 

287 sectors = [] 

288 s = start_sid 

289 if size is None: 

290 # nothing to check against 

291 while s >= 0: 

292 if seen_id is not None: 

293 if self.seen[s]: 

294 raise CompDocError("%s corruption: seen[%d] == %d" % (name, s, self.seen[s])) 

295 self.seen[s] = seen_id 

296 start_pos = base + s * sec_size 

297 sectors.append(mem[start_pos:start_pos+sec_size]) 

298 try: 

299 s = sat[s] 

300 except IndexError: 

301 raise CompDocError( 

302 "OLE2 stream %r: sector allocation table invalid entry (%d)" % 

303 (name, s) 

304 ) 

305 assert s == EOCSID 

306 else: 

307 todo = size 

308 while s >= 0: 

309 if seen_id is not None: 

310 if self.seen[s]: 

311 raise CompDocError("%s corruption: seen[%d] == %d" % (name, s, self.seen[s])) 

312 self.seen[s] = seen_id 

313 start_pos = base + s * sec_size 

314 grab = sec_size 

315 if grab > todo: 

316 grab = todo 

317 todo -= grab 

318 sectors.append(mem[start_pos:start_pos+grab]) 

319 try: 

320 s = sat[s] 

321 except IndexError: 

322 raise CompDocError( 

323 "OLE2 stream %r: sector allocation table invalid entry (%d)" % 

324 (name, s) 

325 ) 

326 assert s == EOCSID 

327 if todo != 0: 

328 fprintf(self.logfile, 

329 "WARNING *** OLE2 stream %r: expected size %d, actual size %d\n", 

330 name, size, size - todo) 

331 

332 return b''.join(sectors) 

333 

334 def _dir_search(self, path, storage_DID=0): 

335 # Return matching DirNode instance, or None 

336 head = path[0] 

337 tail = path[1:] 

338 dl = self.dirlist 

339 for child in dl[storage_DID].children: 

340 if dl[child].name.lower() == head.lower(): 

341 et = dl[child].etype 

342 if et == 2: 

343 return dl[child] 

344 if et == 1: 

345 if not tail: 

346 raise CompDocError("Requested component is a 'storage'") 

347 return self._dir_search(tail, child) 

348 dl[child].dump(1) 

349 raise CompDocError("Requested stream is not a 'user stream'") 

350 return None 

351 

352 

353 def get_named_stream(self, qname): 

354 """ 

355 Interrogate the compound document's directory; return the stream as a 

356 string if found, otherwise return ``None``. 

357 

358 :param qname: 

359 Name of the desired stream e.g. ``'Workbook'``. 

360 Should be in Unicode or convertible thereto. 

361 """ 

362 d = self._dir_search(qname.split("/")) 

363 if d is None: 

364 return None 

365 if d.tot_size >= self.min_size_std_stream: 

366 return self._get_stream( 

367 self.mem, 512, self.SAT, self.sec_size, d.first_SID, 

368 d.tot_size, name=qname, seen_id=d.DID+6) 

369 else: 

370 return self._get_stream( 

371 self.SSCS, 0, self.SSAT, self.short_sec_size, d.first_SID, 

372 d.tot_size, name=qname + " (from SSCS)", seen_id=None) 

373 

374 def locate_named_stream(self, qname): 

375 """ 

376 Interrogate the compound document's directory. 

377 

378 If the named stream is not found, ``(None, 0, 0)`` will be returned. 

379 

380 If the named stream is found and is contiguous within the original 

381 byte sequence (``mem``) used when the document was opened, 

382 then ``(mem, offset_to_start_of_stream, length_of_stream)`` is returned. 

383 

384 Otherwise a new string is built from the fragments and 

385 ``(new_string, 0, length_of_stream)`` is returned. 

386 

387 :param qname: 

388 Name of the desired stream e.g. ``'Workbook'``. 

389 Should be in Unicode or convertible thereto. 

390 """ 

391 d = self._dir_search(qname.split("/")) 

392 if d is None: 

393 return (None, 0, 0) 

394 if d.tot_size > self.mem_data_len: 

395 raise CompDocError("%r stream length (%d bytes) > file data size (%d bytes)" 

396 % (qname, d.tot_size, self.mem_data_len)) 

397 if d.tot_size >= self.min_size_std_stream: 

398 result = self._locate_stream( 

399 self.mem, 512, self.SAT, self.sec_size, d.first_SID, 

400 d.tot_size, qname, d.DID+6) 

401 if self.DEBUG: 

402 print("\nseen", file=self.logfile) 

403 dump_list(self.seen, 20, self.logfile) 

404 return result 

405 else: 

406 return ( 

407 self._get_stream( 

408 self.SSCS, 0, self.SSAT, self.short_sec_size, d.first_SID, 

409 d.tot_size, qname + " (from SSCS)", None), 

410 0, 

411 d.tot_size, 

412 ) 

413 

414 def _locate_stream(self, mem, base, sat, sec_size, start_sid, expected_stream_size, qname, seen_id): 

415 # print >> self.logfile, "_locate_stream", base, sec_size, start_sid, expected_stream_size 

416 s = start_sid 

417 if s < 0: 

418 raise CompDocError("_locate_stream: start_sid (%d) is -ve" % start_sid) 

419 p = -99 # dummy previous SID 

420 start_pos = -9999 

421 end_pos = -8888 

422 slices = [] 

423 tot_found = 0 

424 found_limit = (expected_stream_size + sec_size - 1) // sec_size 

425 while s >= 0: 

426 if self.seen[s]: 

427 if not self.ignore_workbook_corruption: 

428 print("_locate_stream(%s): seen" % qname, file=self.logfile); dump_list(self.seen, 20, self.logfile) 

429 raise CompDocError("%s corruption: seen[%d] == %d" % (qname, s, self.seen[s])) 

430 self.seen[s] = seen_id 

431 tot_found += 1 

432 if tot_found > found_limit: 

433 # Note: expected size rounded up to higher sector 

434 raise CompDocError( 

435 "%s: size exceeds expected %d bytes; corrupt?" 

436 % (qname, found_limit * sec_size) 

437 ) 

438 if s == p+1: 

439 # contiguous sectors 

440 end_pos += sec_size 

441 else: 

442 # start new slice 

443 if p >= 0: 

444 # not first time 

445 slices.append((start_pos, end_pos)) 

446 start_pos = base + s * sec_size 

447 end_pos = start_pos + sec_size 

448 p = s 

449 s = sat[s] 

450 assert s == EOCSID 

451 assert tot_found == found_limit 

452 # print >> self.logfile, "_locate_stream(%s): seen" % qname; dump_list(self.seen, 20, self.logfile) 

453 if not slices: 

454 # The stream is contiguous ... just what we like! 

455 return (mem, start_pos, expected_stream_size) 

456 slices.append((start_pos, end_pos)) 

457 # print >> self.logfile, "+++>>> %d fragments" % len(slices) 

458 return (b''.join(mem[start_pos:end_pos] for start_pos, end_pos in slices), 0, expected_stream_size) 

459 

460# ========================================================================================== 

461def x_dump_line(alist, stride, f, dpos, equal=0): 

462 print("%5d%s" % (dpos, " ="[equal]), end=' ', file=f) 

463 for value in alist[dpos:dpos + stride]: 

464 print(str(value), end=' ', file=f) 

465 print(file=f) 

466 

467def dump_list(alist, stride, f=sys.stdout): 

468 def _dump_line(dpos, equal=0): 

469 print("%5d%s" % (dpos, " ="[equal]), end=' ', file=f) 

470 for value in alist[dpos:dpos + stride]: 

471 print(str(value), end=' ', file=f) 

472 print(file=f) 

473 pos = None 

474 oldpos = None 

475 for pos in xrange(0, len(alist), stride): 

476 if oldpos is None: 

477 _dump_line(pos) 

478 oldpos = pos 

479 elif alist[pos:pos+stride] != alist[oldpos:oldpos+stride]: 

480 if pos - oldpos > stride: 

481 _dump_line(pos - stride, equal=1) 

482 _dump_line(pos) 

483 oldpos = pos 

484 if oldpos is not None and pos is not None and pos != oldpos: 

485 _dump_line(pos, equal=1)