Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/charset_normalizer/api.py: 6%
206 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import logging
2import warnings
3from os import PathLike
4from os.path import basename, splitext
5from typing import Any, BinaryIO, List, Optional, Set
7from .cd import (
8 coherence_ratio,
9 encoding_languages,
10 mb_encoding_languages,
11 merge_coherence_ratios,
12)
13from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE
14from .md import mess_ratio
15from .models import CharsetMatch, CharsetMatches
16from .utils import (
17 any_specified_encoding,
18 cut_sequence_chunks,
19 iana_name,
20 identify_sig_or_bom,
21 is_cp_similar,
22 is_multi_byte_encoding,
23 should_strip_sig_or_bom,
24)
26# Will most likely be controversial
27# logging.addLevelName(TRACE, "TRACE")
28logger = logging.getLogger("charset_normalizer")
29explain_handler = logging.StreamHandler()
30explain_handler.setFormatter(
31 logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
32)
35def from_bytes(
36 sequences: bytes,
37 steps: int = 5,
38 chunk_size: int = 512,
39 threshold: float = 0.2,
40 cp_isolation: Optional[List[str]] = None,
41 cp_exclusion: Optional[List[str]] = None,
42 preemptive_behaviour: bool = True,
43 explain: bool = False,
44) -> CharsetMatches:
45 """
46 Given a raw bytes sequence, return the best possibles charset usable to render str objects.
47 If there is no results, it is a strong indicator that the source is binary/not text.
48 By default, the process will extract 5 blocs of 512o each to assess the mess and coherence of a given sequence.
49 And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will.
51 The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page
52 but never take it for granted. Can improve the performance.
54 You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that
55 purpose.
57 This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
58 By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
59 toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
60 Custom logging format and handler can be set manually.
61 """
63 if not isinstance(sequences, (bytearray, bytes)):
64 raise TypeError(
65 "Expected object of type bytes or bytearray, got: {0}".format(
66 type(sequences)
67 )
68 )
70 if explain:
71 previous_logger_level: int = logger.level
72 logger.addHandler(explain_handler)
73 logger.setLevel(TRACE)
75 length: int = len(sequences)
77 if length == 0:
78 logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.")
79 if explain:
80 logger.removeHandler(explain_handler)
81 logger.setLevel(previous_logger_level or logging.WARNING)
82 return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
84 if cp_isolation is not None:
85 logger.log(
86 TRACE,
87 "cp_isolation is set. use this flag for debugging purpose. "
88 "limited list of encoding allowed : %s.",
89 ", ".join(cp_isolation),
90 )
91 cp_isolation = [iana_name(cp, False) for cp in cp_isolation]
92 else:
93 cp_isolation = []
95 if cp_exclusion is not None:
96 logger.log(
97 TRACE,
98 "cp_exclusion is set. use this flag for debugging purpose. "
99 "limited list of encoding excluded : %s.",
100 ", ".join(cp_exclusion),
101 )
102 cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion]
103 else:
104 cp_exclusion = []
106 if length <= (chunk_size * steps):
107 logger.log(
108 TRACE,
109 "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.",
110 steps,
111 chunk_size,
112 length,
113 )
114 steps = 1
115 chunk_size = length
117 if steps > 1 and length / steps < chunk_size:
118 chunk_size = int(length / steps)
120 is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE
121 is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE
123 if is_too_small_sequence:
124 logger.log(
125 TRACE,
126 "Trying to detect encoding from a tiny portion of ({}) byte(s).".format(
127 length
128 ),
129 )
130 elif is_too_large_sequence:
131 logger.log(
132 TRACE,
133 "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format(
134 length
135 ),
136 )
138 prioritized_encodings: List[str] = []
140 specified_encoding: Optional[str] = (
141 any_specified_encoding(sequences) if preemptive_behaviour else None
142 )
144 if specified_encoding is not None:
145 prioritized_encodings.append(specified_encoding)
146 logger.log(
147 TRACE,
148 "Detected declarative mark in sequence. Priority +1 given for %s.",
149 specified_encoding,
150 )
152 tested: Set[str] = set()
153 tested_but_hard_failure: List[str] = []
154 tested_but_soft_failure: List[str] = []
156 fallback_ascii: Optional[CharsetMatch] = None
157 fallback_u8: Optional[CharsetMatch] = None
158 fallback_specified: Optional[CharsetMatch] = None
160 results: CharsetMatches = CharsetMatches()
162 sig_encoding, sig_payload = identify_sig_or_bom(sequences)
164 if sig_encoding is not None:
165 prioritized_encodings.append(sig_encoding)
166 logger.log(
167 TRACE,
168 "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.",
169 len(sig_payload),
170 sig_encoding,
171 )
173 prioritized_encodings.append("ascii")
175 if "utf_8" not in prioritized_encodings:
176 prioritized_encodings.append("utf_8")
178 for encoding_iana in prioritized_encodings + IANA_SUPPORTED:
180 if cp_isolation and encoding_iana not in cp_isolation:
181 continue
183 if cp_exclusion and encoding_iana in cp_exclusion:
184 continue
186 if encoding_iana in tested:
187 continue
189 tested.add(encoding_iana)
191 decoded_payload: Optional[str] = None
192 bom_or_sig_available: bool = sig_encoding == encoding_iana
193 strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom(
194 encoding_iana
195 )
197 if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
198 logger.log(
199 TRACE,
200 "Encoding %s wont be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
201 encoding_iana,
202 )
203 continue
205 try:
206 is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana)
207 except (ModuleNotFoundError, ImportError):
208 logger.log(
209 TRACE,
210 "Encoding %s does not provide an IncrementalDecoder",
211 encoding_iana,
212 )
213 continue
215 try:
216 if is_too_large_sequence and is_multi_byte_decoder is False:
217 str(
218 sequences[: int(50e4)]
219 if strip_sig_or_bom is False
220 else sequences[len(sig_payload) : int(50e4)],
221 encoding=encoding_iana,
222 )
223 else:
224 decoded_payload = str(
225 sequences
226 if strip_sig_or_bom is False
227 else sequences[len(sig_payload) :],
228 encoding=encoding_iana,
229 )
230 except (UnicodeDecodeError, LookupError) as e:
231 if not isinstance(e, LookupError):
232 logger.log(
233 TRACE,
234 "Code page %s does not fit given bytes sequence at ALL. %s",
235 encoding_iana,
236 str(e),
237 )
238 tested_but_hard_failure.append(encoding_iana)
239 continue
241 similar_soft_failure_test: bool = False
243 for encoding_soft_failed in tested_but_soft_failure:
244 if is_cp_similar(encoding_iana, encoding_soft_failed):
245 similar_soft_failure_test = True
246 break
248 if similar_soft_failure_test:
249 logger.log(
250 TRACE,
251 "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!",
252 encoding_iana,
253 encoding_soft_failed,
254 )
255 continue
257 r_ = range(
258 0 if not bom_or_sig_available else len(sig_payload),
259 length,
260 int(length / steps),
261 )
263 multi_byte_bonus: bool = (
264 is_multi_byte_decoder
265 and decoded_payload is not None
266 and len(decoded_payload) < length
267 )
269 if multi_byte_bonus:
270 logger.log(
271 TRACE,
272 "Code page %s is a multi byte encoding table and it appear that at least one character "
273 "was encoded using n-bytes.",
274 encoding_iana,
275 )
277 max_chunk_gave_up: int = int(len(r_) / 4)
279 max_chunk_gave_up = max(max_chunk_gave_up, 2)
280 early_stop_count: int = 0
281 lazy_str_hard_failure = False
283 md_chunks: List[str] = []
284 md_ratios = []
286 try:
287 for chunk in cut_sequence_chunks(
288 sequences,
289 encoding_iana,
290 r_,
291 chunk_size,
292 bom_or_sig_available,
293 strip_sig_or_bom,
294 sig_payload,
295 is_multi_byte_decoder,
296 decoded_payload,
297 ):
298 md_chunks.append(chunk)
300 md_ratios.append(mess_ratio(chunk, threshold))
302 if md_ratios[-1] >= threshold:
303 early_stop_count += 1
305 if (early_stop_count >= max_chunk_gave_up) or (
306 bom_or_sig_available and strip_sig_or_bom is False
307 ):
308 break
309 except UnicodeDecodeError as e: # Lazy str loading may have missed something there
310 logger.log(
311 TRACE,
312 "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
313 encoding_iana,
314 str(e),
315 )
316 early_stop_count = max_chunk_gave_up
317 lazy_str_hard_failure = True
319 # We might want to check the sequence again with the whole content
320 # Only if initial MD tests passes
321 if (
322 not lazy_str_hard_failure
323 and is_too_large_sequence
324 and not is_multi_byte_decoder
325 ):
326 try:
327 sequences[int(50e3) :].decode(encoding_iana, errors="strict")
328 except UnicodeDecodeError as e:
329 logger.log(
330 TRACE,
331 "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
332 encoding_iana,
333 str(e),
334 )
335 tested_but_hard_failure.append(encoding_iana)
336 continue
338 mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
339 if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
340 tested_but_soft_failure.append(encoding_iana)
341 logger.log(
342 TRACE,
343 "%s was excluded because of initial chaos probing. Gave up %i time(s). "
344 "Computed mean chaos is %f %%.",
345 encoding_iana,
346 early_stop_count,
347 round(mean_mess_ratio * 100, ndigits=3),
348 )
349 # Preparing those fallbacks in case we got nothing.
350 if (
351 encoding_iana in ["ascii", "utf_8", specified_encoding]
352 and not lazy_str_hard_failure
353 ):
354 fallback_entry = CharsetMatch(
355 sequences, encoding_iana, threshold, False, [], decoded_payload
356 )
357 if encoding_iana == specified_encoding:
358 fallback_specified = fallback_entry
359 elif encoding_iana == "ascii":
360 fallback_ascii = fallback_entry
361 else:
362 fallback_u8 = fallback_entry
363 continue
365 logger.log(
366 TRACE,
367 "%s passed initial chaos probing. Mean measured chaos is %f %%",
368 encoding_iana,
369 round(mean_mess_ratio * 100, ndigits=3),
370 )
372 if not is_multi_byte_decoder:
373 target_languages: List[str] = encoding_languages(encoding_iana)
374 else:
375 target_languages = mb_encoding_languages(encoding_iana)
377 if target_languages:
378 logger.log(
379 TRACE,
380 "{} should target any language(s) of {}".format(
381 encoding_iana, str(target_languages)
382 ),
383 )
385 cd_ratios = []
387 # We shall skip the CD when its about ASCII
388 # Most of the time its not relevant to run "language-detection" on it.
389 if encoding_iana != "ascii":
390 for chunk in md_chunks:
391 chunk_languages = coherence_ratio(
392 chunk, 0.1, ",".join(target_languages) if target_languages else None
393 )
395 cd_ratios.append(chunk_languages)
397 cd_ratios_merged = merge_coherence_ratios(cd_ratios)
399 if cd_ratios_merged:
400 logger.log(
401 TRACE,
402 "We detected language {} using {}".format(
403 cd_ratios_merged, encoding_iana
404 ),
405 )
407 results.append(
408 CharsetMatch(
409 sequences,
410 encoding_iana,
411 mean_mess_ratio,
412 bom_or_sig_available,
413 cd_ratios_merged,
414 decoded_payload,
415 )
416 )
418 if (
419 encoding_iana in [specified_encoding, "ascii", "utf_8"]
420 and mean_mess_ratio < 0.1
421 ):
422 logger.debug(
423 "Encoding detection: %s is most likely the one.", encoding_iana
424 )
425 if explain:
426 logger.removeHandler(explain_handler)
427 logger.setLevel(previous_logger_level)
428 return CharsetMatches([results[encoding_iana]])
430 if encoding_iana == sig_encoding:
431 logger.debug(
432 "Encoding detection: %s is most likely the one as we detected a BOM or SIG within "
433 "the beginning of the sequence.",
434 encoding_iana,
435 )
436 if explain:
437 logger.removeHandler(explain_handler)
438 logger.setLevel(previous_logger_level)
439 return CharsetMatches([results[encoding_iana]])
441 if len(results) == 0:
442 if fallback_u8 or fallback_ascii or fallback_specified:
443 logger.log(
444 TRACE,
445 "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.",
446 )
448 if fallback_specified:
449 logger.debug(
450 "Encoding detection: %s will be used as a fallback match",
451 fallback_specified.encoding,
452 )
453 results.append(fallback_specified)
454 elif (
455 (fallback_u8 and fallback_ascii is None)
456 or (
457 fallback_u8
458 and fallback_ascii
459 and fallback_u8.fingerprint != fallback_ascii.fingerprint
460 )
461 or (fallback_u8 is not None)
462 ):
463 logger.debug("Encoding detection: utf_8 will be used as a fallback match")
464 results.append(fallback_u8)
465 elif fallback_ascii:
466 logger.debug("Encoding detection: ascii will be used as a fallback match")
467 results.append(fallback_ascii)
469 if results:
470 logger.debug(
471 "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.",
472 results.best().encoding, # type: ignore
473 len(results) - 1,
474 )
475 else:
476 logger.debug("Encoding detection: Unable to determine any suitable charset.")
478 if explain:
479 logger.removeHandler(explain_handler)
480 logger.setLevel(previous_logger_level)
482 return results
485def from_fp(
486 fp: BinaryIO,
487 steps: int = 5,
488 chunk_size: int = 512,
489 threshold: float = 0.20,
490 cp_isolation: Optional[List[str]] = None,
491 cp_exclusion: Optional[List[str]] = None,
492 preemptive_behaviour: bool = True,
493 explain: bool = False,
494) -> CharsetMatches:
495 """
496 Same thing than the function from_bytes but using a file pointer that is already ready.
497 Will not close the file pointer.
498 """
499 return from_bytes(
500 fp.read(),
501 steps,
502 chunk_size,
503 threshold,
504 cp_isolation,
505 cp_exclusion,
506 preemptive_behaviour,
507 explain,
508 )
511def from_path(
512 path: "PathLike[Any]",
513 steps: int = 5,
514 chunk_size: int = 512,
515 threshold: float = 0.20,
516 cp_isolation: Optional[List[str]] = None,
517 cp_exclusion: Optional[List[str]] = None,
518 preemptive_behaviour: bool = True,
519 explain: bool = False,
520) -> CharsetMatches:
521 """
522 Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode.
523 Can raise IOError.
524 """
525 with open(path, "rb") as fp:
526 return from_fp(
527 fp,
528 steps,
529 chunk_size,
530 threshold,
531 cp_isolation,
532 cp_exclusion,
533 preemptive_behaviour,
534 explain,
535 )
538def normalize(
539 path: "PathLike[Any]",
540 steps: int = 5,
541 chunk_size: int = 512,
542 threshold: float = 0.20,
543 cp_isolation: Optional[List[str]] = None,
544 cp_exclusion: Optional[List[str]] = None,
545 preemptive_behaviour: bool = True,
546) -> CharsetMatch:
547 """
548 Take a (text-based) file path and try to create another file next to it, this time using UTF-8.
549 """
550 warnings.warn(
551 "normalize is deprecated and will be removed in 3.0",
552 DeprecationWarning,
553 )
555 results = from_path(
556 path,
557 steps,
558 chunk_size,
559 threshold,
560 cp_isolation,
561 cp_exclusion,
562 preemptive_behaviour,
563 )
565 filename = basename(path)
566 target_extensions = list(splitext(filename))
568 if len(results) == 0:
569 raise IOError(
570 'Unable to normalize "{}", no encoding charset seems to fit.'.format(
571 filename
572 )
573 )
575 result = results.best()
577 target_extensions[0] += "-" + result.encoding # type: ignore
579 with open(
580 "{}".format(str(path).replace(filename, "".join(target_extensions))), "wb"
581 ) as fp:
582 fp.write(result.output()) # type: ignore
584 return result # type: ignore