Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/charset_normalizer/api.py: 6%

206 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import logging 

2import warnings 

3from os import PathLike 

4from os.path import basename, splitext 

5from typing import Any, BinaryIO, List, Optional, Set 

6 

7from .cd import ( 

8 coherence_ratio, 

9 encoding_languages, 

10 mb_encoding_languages, 

11 merge_coherence_ratios, 

12) 

13from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE 

14from .md import mess_ratio 

15from .models import CharsetMatch, CharsetMatches 

16from .utils import ( 

17 any_specified_encoding, 

18 cut_sequence_chunks, 

19 iana_name, 

20 identify_sig_or_bom, 

21 is_cp_similar, 

22 is_multi_byte_encoding, 

23 should_strip_sig_or_bom, 

24) 

25 

26# Will most likely be controversial 

27# logging.addLevelName(TRACE, "TRACE") 

28logger = logging.getLogger("charset_normalizer") 

29explain_handler = logging.StreamHandler() 

30explain_handler.setFormatter( 

31 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") 

32) 

33 

34 

35def from_bytes( 

36 sequences: bytes, 

37 steps: int = 5, 

38 chunk_size: int = 512, 

39 threshold: float = 0.2, 

40 cp_isolation: Optional[List[str]] = None, 

41 cp_exclusion: Optional[List[str]] = None, 

42 preemptive_behaviour: bool = True, 

43 explain: bool = False, 

44) -> CharsetMatches: 

45 """ 

46 Given a raw bytes sequence, return the best possibles charset usable to render str objects. 

47 If there is no results, it is a strong indicator that the source is binary/not text. 

48 By default, the process will extract 5 blocs of 512o each to assess the mess and coherence of a given sequence. 

49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. 

50 

51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page 

52 but never take it for granted. Can improve the performance. 

53 

54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that 

55 purpose. 

56 

57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. 

58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' 

59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. 

60 Custom logging format and handler can be set manually. 

61 """ 

62 

63 if not isinstance(sequences, (bytearray, bytes)): 

64 raise TypeError( 

65 "Expected object of type bytes or bytearray, got: {0}".format( 

66 type(sequences) 

67 ) 

68 ) 

69 

70 if explain: 

71 previous_logger_level: int = logger.level 

72 logger.addHandler(explain_handler) 

73 logger.setLevel(TRACE) 

74 

75 length: int = len(sequences) 

76 

77 if length == 0: 

78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") 

79 if explain: 

80 logger.removeHandler(explain_handler) 

81 logger.setLevel(previous_logger_level or logging.WARNING) 

82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) 

83 

84 if cp_isolation is not None: 

85 logger.log( 

86 TRACE, 

87 "cp_isolation is set. use this flag for debugging purpose. " 

88 "limited list of encoding allowed : %s.", 

89 ", ".join(cp_isolation), 

90 ) 

91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation] 

92 else: 

93 cp_isolation = [] 

94 

95 if cp_exclusion is not None: 

96 logger.log( 

97 TRACE, 

98 "cp_exclusion is set. use this flag for debugging purpose. " 

99 "limited list of encoding excluded : %s.", 

100 ", ".join(cp_exclusion), 

101 ) 

102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] 

103 else: 

104 cp_exclusion = [] 

105 

106 if length <= (chunk_size * steps): 

107 logger.log( 

108 TRACE, 

109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", 

110 steps, 

111 chunk_size, 

112 length, 

113 ) 

114 steps = 1 

115 chunk_size = length 

116 

117 if steps > 1 and length / steps < chunk_size: 

118 chunk_size = int(length / steps) 

119 

120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE 

121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE 

122 

123 if is_too_small_sequence: 

124 logger.log( 

125 TRACE, 

126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( 

127 length 

128 ), 

129 ) 

130 elif is_too_large_sequence: 

131 logger.log( 

132 TRACE, 

133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( 

134 length 

135 ), 

136 ) 

137 

138 prioritized_encodings: List[str] = [] 

139 

140 specified_encoding: Optional[str] = ( 

141 any_specified_encoding(sequences) if preemptive_behaviour else None 

142 ) 

143 

144 if specified_encoding is not None: 

145 prioritized_encodings.append(specified_encoding) 

146 logger.log( 

147 TRACE, 

148 "Detected declarative mark in sequence. Priority +1 given for %s.", 

149 specified_encoding, 

150 ) 

151 

152 tested: Set[str] = set() 

153 tested_but_hard_failure: List[str] = [] 

154 tested_but_soft_failure: List[str] = [] 

155 

156 fallback_ascii: Optional[CharsetMatch] = None 

157 fallback_u8: Optional[CharsetMatch] = None 

158 fallback_specified: Optional[CharsetMatch] = None 

159 

160 results: CharsetMatches = CharsetMatches() 

161 

162 sig_encoding, sig_payload = identify_sig_or_bom(sequences) 

163 

164 if sig_encoding is not None: 

165 prioritized_encodings.append(sig_encoding) 

166 logger.log( 

167 TRACE, 

168 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", 

169 len(sig_payload), 

170 sig_encoding, 

171 ) 

172 

173 prioritized_encodings.append("ascii") 

174 

175 if "utf_8" not in prioritized_encodings: 

176 prioritized_encodings.append("utf_8") 

177 

178 for encoding_iana in prioritized_encodings + IANA_SUPPORTED: 

179 

180 if cp_isolation and encoding_iana not in cp_isolation: 

181 continue 

182 

183 if cp_exclusion and encoding_iana in cp_exclusion: 

184 continue 

185 

186 if encoding_iana in tested: 

187 continue 

188 

189 tested.add(encoding_iana) 

190 

191 decoded_payload: Optional[str] = None 

192 bom_or_sig_available: bool = sig_encoding == encoding_iana 

193 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( 

194 encoding_iana 

195 ) 

196 

197 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: 

198 logger.log( 

199 TRACE, 

200 "Encoding %s wont be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", 

201 encoding_iana, 

202 ) 

203 continue 

204 

205 try: 

206 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) 

207 except (ModuleNotFoundError, ImportError): 

208 logger.log( 

209 TRACE, 

210 "Encoding %s does not provide an IncrementalDecoder", 

211 encoding_iana, 

212 ) 

213 continue 

214 

215 try: 

216 if is_too_large_sequence and is_multi_byte_decoder is False: 

217 str( 

218 sequences[: int(50e4)] 

219 if strip_sig_or_bom is False 

220 else sequences[len(sig_payload) : int(50e4)], 

221 encoding=encoding_iana, 

222 ) 

223 else: 

224 decoded_payload = str( 

225 sequences 

226 if strip_sig_or_bom is False 

227 else sequences[len(sig_payload) :], 

228 encoding=encoding_iana, 

229 ) 

230 except (UnicodeDecodeError, LookupError) as e: 

231 if not isinstance(e, LookupError): 

232 logger.log( 

233 TRACE, 

234 "Code page %s does not fit given bytes sequence at ALL. %s", 

235 encoding_iana, 

236 str(e), 

237 ) 

238 tested_but_hard_failure.append(encoding_iana) 

239 continue 

240 

241 similar_soft_failure_test: bool = False 

242 

243 for encoding_soft_failed in tested_but_soft_failure: 

244 if is_cp_similar(encoding_iana, encoding_soft_failed): 

245 similar_soft_failure_test = True 

246 break 

247 

248 if similar_soft_failure_test: 

249 logger.log( 

250 TRACE, 

251 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", 

252 encoding_iana, 

253 encoding_soft_failed, 

254 ) 

255 continue 

256 

257 r_ = range( 

258 0 if not bom_or_sig_available else len(sig_payload), 

259 length, 

260 int(length / steps), 

261 ) 

262 

263 multi_byte_bonus: bool = ( 

264 is_multi_byte_decoder 

265 and decoded_payload is not None 

266 and len(decoded_payload) < length 

267 ) 

268 

269 if multi_byte_bonus: 

270 logger.log( 

271 TRACE, 

272 "Code page %s is a multi byte encoding table and it appear that at least one character " 

273 "was encoded using n-bytes.", 

274 encoding_iana, 

275 ) 

276 

277 max_chunk_gave_up: int = int(len(r_) / 4) 

278 

279 max_chunk_gave_up = max(max_chunk_gave_up, 2) 

280 early_stop_count: int = 0 

281 lazy_str_hard_failure = False 

282 

283 md_chunks: List[str] = [] 

284 md_ratios = [] 

285 

286 try: 

287 for chunk in cut_sequence_chunks( 

288 sequences, 

289 encoding_iana, 

290 r_, 

291 chunk_size, 

292 bom_or_sig_available, 

293 strip_sig_or_bom, 

294 sig_payload, 

295 is_multi_byte_decoder, 

296 decoded_payload, 

297 ): 

298 md_chunks.append(chunk) 

299 

300 md_ratios.append(mess_ratio(chunk, threshold)) 

301 

302 if md_ratios[-1] >= threshold: 

303 early_stop_count += 1 

304 

305 if (early_stop_count >= max_chunk_gave_up) or ( 

306 bom_or_sig_available and strip_sig_or_bom is False 

307 ): 

308 break 

309 except UnicodeDecodeError as e: # Lazy str loading may have missed something there 

310 logger.log( 

311 TRACE, 

312 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", 

313 encoding_iana, 

314 str(e), 

315 ) 

316 early_stop_count = max_chunk_gave_up 

317 lazy_str_hard_failure = True 

318 

319 # We might want to check the sequence again with the whole content 

320 # Only if initial MD tests passes 

321 if ( 

322 not lazy_str_hard_failure 

323 and is_too_large_sequence 

324 and not is_multi_byte_decoder 

325 ): 

326 try: 

327 sequences[int(50e3) :].decode(encoding_iana, errors="strict") 

328 except UnicodeDecodeError as e: 

329 logger.log( 

330 TRACE, 

331 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", 

332 encoding_iana, 

333 str(e), 

334 ) 

335 tested_but_hard_failure.append(encoding_iana) 

336 continue 

337 

338 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 

339 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: 

340 tested_but_soft_failure.append(encoding_iana) 

341 logger.log( 

342 TRACE, 

343 "%s was excluded because of initial chaos probing. Gave up %i time(s). " 

344 "Computed mean chaos is %f %%.", 

345 encoding_iana, 

346 early_stop_count, 

347 round(mean_mess_ratio * 100, ndigits=3), 

348 ) 

349 # Preparing those fallbacks in case we got nothing. 

350 if ( 

351 encoding_iana in ["ascii", "utf_8", specified_encoding] 

352 and not lazy_str_hard_failure 

353 ): 

354 fallback_entry = CharsetMatch( 

355 sequences, encoding_iana, threshold, False, [], decoded_payload 

356 ) 

357 if encoding_iana == specified_encoding: 

358 fallback_specified = fallback_entry 

359 elif encoding_iana == "ascii": 

360 fallback_ascii = fallback_entry 

361 else: 

362 fallback_u8 = fallback_entry 

363 continue 

364 

365 logger.log( 

366 TRACE, 

367 "%s passed initial chaos probing. Mean measured chaos is %f %%", 

368 encoding_iana, 

369 round(mean_mess_ratio * 100, ndigits=3), 

370 ) 

371 

372 if not is_multi_byte_decoder: 

373 target_languages: List[str] = encoding_languages(encoding_iana) 

374 else: 

375 target_languages = mb_encoding_languages(encoding_iana) 

376 

377 if target_languages: 

378 logger.log( 

379 TRACE, 

380 "{} should target any language(s) of {}".format( 

381 encoding_iana, str(target_languages) 

382 ), 

383 ) 

384 

385 cd_ratios = [] 

386 

387 # We shall skip the CD when its about ASCII 

388 # Most of the time its not relevant to run "language-detection" on it. 

389 if encoding_iana != "ascii": 

390 for chunk in md_chunks: 

391 chunk_languages = coherence_ratio( 

392 chunk, 0.1, ",".join(target_languages) if target_languages else None 

393 ) 

394 

395 cd_ratios.append(chunk_languages) 

396 

397 cd_ratios_merged = merge_coherence_ratios(cd_ratios) 

398 

399 if cd_ratios_merged: 

400 logger.log( 

401 TRACE, 

402 "We detected language {} using {}".format( 

403 cd_ratios_merged, encoding_iana 

404 ), 

405 ) 

406 

407 results.append( 

408 CharsetMatch( 

409 sequences, 

410 encoding_iana, 

411 mean_mess_ratio, 

412 bom_or_sig_available, 

413 cd_ratios_merged, 

414 decoded_payload, 

415 ) 

416 ) 

417 

418 if ( 

419 encoding_iana in [specified_encoding, "ascii", "utf_8"] 

420 and mean_mess_ratio < 0.1 

421 ): 

422 logger.debug( 

423 "Encoding detection: %s is most likely the one.", encoding_iana 

424 ) 

425 if explain: 

426 logger.removeHandler(explain_handler) 

427 logger.setLevel(previous_logger_level) 

428 return CharsetMatches([results[encoding_iana]]) 

429 

430 if encoding_iana == sig_encoding: 

431 logger.debug( 

432 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " 

433 "the beginning of the sequence.", 

434 encoding_iana, 

435 ) 

436 if explain: 

437 logger.removeHandler(explain_handler) 

438 logger.setLevel(previous_logger_level) 

439 return CharsetMatches([results[encoding_iana]]) 

440 

441 if len(results) == 0: 

442 if fallback_u8 or fallback_ascii or fallback_specified: 

443 logger.log( 

444 TRACE, 

445 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", 

446 ) 

447 

448 if fallback_specified: 

449 logger.debug( 

450 "Encoding detection: %s will be used as a fallback match", 

451 fallback_specified.encoding, 

452 ) 

453 results.append(fallback_specified) 

454 elif ( 

455 (fallback_u8 and fallback_ascii is None) 

456 or ( 

457 fallback_u8 

458 and fallback_ascii 

459 and fallback_u8.fingerprint != fallback_ascii.fingerprint 

460 ) 

461 or (fallback_u8 is not None) 

462 ): 

463 logger.debug("Encoding detection: utf_8 will be used as a fallback match") 

464 results.append(fallback_u8) 

465 elif fallback_ascii: 

466 logger.debug("Encoding detection: ascii will be used as a fallback match") 

467 results.append(fallback_ascii) 

468 

469 if results: 

470 logger.debug( 

471 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", 

472 results.best().encoding, # type: ignore 

473 len(results) - 1, 

474 ) 

475 else: 

476 logger.debug("Encoding detection: Unable to determine any suitable charset.") 

477 

478 if explain: 

479 logger.removeHandler(explain_handler) 

480 logger.setLevel(previous_logger_level) 

481 

482 return results 

483 

484 

485def from_fp( 

486 fp: BinaryIO, 

487 steps: int = 5, 

488 chunk_size: int = 512, 

489 threshold: float = 0.20, 

490 cp_isolation: Optional[List[str]] = None, 

491 cp_exclusion: Optional[List[str]] = None, 

492 preemptive_behaviour: bool = True, 

493 explain: bool = False, 

494) -> CharsetMatches: 

495 """ 

496 Same thing than the function from_bytes but using a file pointer that is already ready. 

497 Will not close the file pointer. 

498 """ 

499 return from_bytes( 

500 fp.read(), 

501 steps, 

502 chunk_size, 

503 threshold, 

504 cp_isolation, 

505 cp_exclusion, 

506 preemptive_behaviour, 

507 explain, 

508 ) 

509 

510 

511def from_path( 

512 path: "PathLike[Any]", 

513 steps: int = 5, 

514 chunk_size: int = 512, 

515 threshold: float = 0.20, 

516 cp_isolation: Optional[List[str]] = None, 

517 cp_exclusion: Optional[List[str]] = None, 

518 preemptive_behaviour: bool = True, 

519 explain: bool = False, 

520) -> CharsetMatches: 

521 """ 

522 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. 

523 Can raise IOError. 

524 """ 

525 with open(path, "rb") as fp: 

526 return from_fp( 

527 fp, 

528 steps, 

529 chunk_size, 

530 threshold, 

531 cp_isolation, 

532 cp_exclusion, 

533 preemptive_behaviour, 

534 explain, 

535 ) 

536 

537 

538def normalize( 

539 path: "PathLike[Any]", 

540 steps: int = 5, 

541 chunk_size: int = 512, 

542 threshold: float = 0.20, 

543 cp_isolation: Optional[List[str]] = None, 

544 cp_exclusion: Optional[List[str]] = None, 

545 preemptive_behaviour: bool = True, 

546) -> CharsetMatch: 

547 """ 

548 Take a (text-based) file path and try to create another file next to it, this time using UTF-8. 

549 """ 

550 warnings.warn( 

551 "normalize is deprecated and will be removed in 3.0", 

552 DeprecationWarning, 

553 ) 

554 

555 results = from_path( 

556 path, 

557 steps, 

558 chunk_size, 

559 threshold, 

560 cp_isolation, 

561 cp_exclusion, 

562 preemptive_behaviour, 

563 ) 

564 

565 filename = basename(path) 

566 target_extensions = list(splitext(filename)) 

567 

568 if len(results) == 0: 

569 raise IOError( 

570 'Unable to normalize "{}", no encoding charset seems to fit.'.format( 

571 filename 

572 ) 

573 ) 

574 

575 result = results.best() 

576 

577 target_extensions[0] += "-" + result.encoding # type: ignore 

578 

579 with open( 

580 "{}".format(str(path).replace(filename, "".join(target_extensions))), "wb" 

581 ) as fp: 

582 fp.write(result.output()) # type: ignore 

583 

584 return result # type: ignore