Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/git/config.py: 21%

438 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# config.py 

2# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

3# 

4# This module is part of GitPython and is released under 

5# the BSD License: http://www.opensource.org/licenses/bsd-license.php 

6"""Module containing module parser implementation able to properly read and write 

7configuration files""" 

8 

9import sys 

10import abc 

11from functools import wraps 

12import inspect 

13from io import BufferedReader, IOBase 

14import logging 

15import os 

16import re 

17import fnmatch 

18 

19from git.compat import ( 

20 defenc, 

21 force_text, 

22 is_win, 

23) 

24 

25from git.util import LockFile 

26 

27import os.path as osp 

28 

29import configparser as cp 

30 

31# typing------------------------------------------------------- 

32 

33from typing import ( 

34 Any, 

35 Callable, 

36 Generic, 

37 IO, 

38 List, 

39 Dict, 

40 Sequence, 

41 TYPE_CHECKING, 

42 Tuple, 

43 TypeVar, 

44 Union, 

45 cast, 

46) 

47 

48from git.types import Lit_config_levels, ConfigLevels_Tup, PathLike, assert_never, _T 

49 

50if TYPE_CHECKING: 50 ↛ 51line 50 didn't jump to line 51, because the condition on line 50 was never true

51 from git.repo.base import Repo 

52 from io import BytesIO 

53 

54T_ConfigParser = TypeVar("T_ConfigParser", bound="GitConfigParser") 

55T_OMD_value = TypeVar("T_OMD_value", str, bytes, int, float, bool) 

56 

57if sys.version_info[:3] < (3, 7, 2): 57 ↛ 59line 57 didn't jump to line 59, because the condition on line 57 was never true

58 # typing.Ordereddict not added until py 3.7.2 

59 from collections import OrderedDict 

60 

61 OrderedDict_OMD = OrderedDict 

62else: 

63 from typing import OrderedDict 

64 

65 OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc] 

66 

67# ------------------------------------------------------------- 

68 

69__all__ = ("GitConfigParser", "SectionConstraint") 

70 

71 

72log = logging.getLogger("git.config") 

73log.addHandler(logging.NullHandler()) 

74 

75# invariants 

76# represents the configuration level of a configuration file 

77 

78 

79CONFIG_LEVELS: ConfigLevels_Tup = ("system", "user", "global", "repository") 

80 

81 

82# Section pattern to detect conditional includes. 

83# https://git-scm.com/docs/git-config#_conditional_includes 

84CONDITIONAL_INCLUDE_REGEXP = re.compile(r"(?<=includeIf )\"(gitdir|gitdir/i|onbranch):(.+)\"") 

85 

86 

87class MetaParserBuilder(abc.ABCMeta): # noqa: B024 

88 """Utility class wrapping base-class methods into decorators that assure read-only properties""" 

89 

90 def __new__(cls, name: str, bases: Tuple, clsdict: Dict[str, Any]) -> "MetaParserBuilder": 

91 """ 

92 Equip all base-class methods with a needs_values decorator, and all non-const methods 

93 with a set_dirty_and_flush_changes decorator in addition to that.""" 

94 kmm = "_mutating_methods_" 

95 if kmm in clsdict: 

96 mutating_methods = clsdict[kmm] 

97 for base in bases: 

98 methods = (t for t in inspect.getmembers(base, inspect.isroutine) if not t[0].startswith("_")) 

99 for name, method in methods: 

100 if name in clsdict: 

101 continue 

102 method_with_values = needs_values(method) 

103 if name in mutating_methods: 

104 method_with_values = set_dirty_and_flush_changes(method_with_values) 

105 # END mutating methods handling 

106 

107 clsdict[name] = method_with_values 

108 # END for each name/method pair 

109 # END for each base 

110 # END if mutating methods configuration is set 

111 

112 new_type = super(MetaParserBuilder, cls).__new__(cls, name, bases, clsdict) 

113 return new_type 

114 

115 

116def needs_values(func: Callable[..., _T]) -> Callable[..., _T]: 

117 """Returns method assuring we read values (on demand) before we try to access them""" 

118 

119 @wraps(func) 

120 def assure_data_present(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: 

121 self.read() 

122 return func(self, *args, **kwargs) 

123 

124 # END wrapper method 

125 return assure_data_present 

126 

127 

128def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]: 

129 """Return method that checks whether given non constant function may be called. 

130 If so, the instance will be set dirty. 

131 Additionally, we flush the changes right to disk""" 

132 

133 def flush_changes(self: "GitConfigParser", *args: Any, **kwargs: Any) -> _T: 

134 rval = non_const_func(self, *args, **kwargs) 

135 self._dirty = True 

136 self.write() 

137 return rval 

138 

139 # END wrapper method 

140 flush_changes.__name__ = non_const_func.__name__ 

141 return flush_changes 

142 

143 

144class SectionConstraint(Generic[T_ConfigParser]): 

145 

146 """Constrains a ConfigParser to only option commands which are constrained to 

147 always use the section we have been initialized with. 

148 

149 It supports all ConfigParser methods that operate on an option. 

150 

151 :note: 

152 If used as a context manager, will release the wrapped ConfigParser.""" 

153 

154 __slots__ = ("_config", "_section_name") 

155 _valid_attrs_ = ( 

156 "get_value", 

157 "set_value", 

158 "get", 

159 "set", 

160 "getint", 

161 "getfloat", 

162 "getboolean", 

163 "has_option", 

164 "remove_section", 

165 "remove_option", 

166 "options", 

167 ) 

168 

169 def __init__(self, config: T_ConfigParser, section: str) -> None: 

170 self._config = config 

171 self._section_name = section 

172 

173 def __del__(self) -> None: 

174 # Yes, for some reason, we have to call it explicitly for it to work in PY3 ! 

175 # Apparently __del__ doesn't get call anymore if refcount becomes 0 

176 # Ridiculous ... . 

177 self._config.release() 

178 

179 def __getattr__(self, attr: str) -> Any: 

180 if attr in self._valid_attrs_: 

181 return lambda *args, **kwargs: self._call_config(attr, *args, **kwargs) 

182 return super(SectionConstraint, self).__getattribute__(attr) 

183 

184 def _call_config(self, method: str, *args: Any, **kwargs: Any) -> Any: 

185 """Call the configuration at the given method which must take a section name 

186 as first argument""" 

187 return getattr(self._config, method)(self._section_name, *args, **kwargs) 

188 

189 @property 

190 def config(self) -> T_ConfigParser: 

191 """return: Configparser instance we constrain""" 

192 return self._config 

193 

194 def release(self) -> None: 

195 """Equivalent to GitConfigParser.release(), which is called on our underlying parser instance""" 

196 return self._config.release() 

197 

198 def __enter__(self) -> "SectionConstraint[T_ConfigParser]": 

199 self._config.__enter__() 

200 return self 

201 

202 def __exit__(self, exception_type: str, exception_value: str, traceback: str) -> None: 

203 self._config.__exit__(exception_type, exception_value, traceback) 

204 

205 

206class _OMD(OrderedDict_OMD): 

207 """Ordered multi-dict.""" 

208 

209 def __setitem__(self, key: str, value: _T) -> None: 

210 super(_OMD, self).__setitem__(key, [value]) 

211 

212 def add(self, key: str, value: Any) -> None: 

213 if key not in self: 

214 super(_OMD, self).__setitem__(key, [value]) 

215 return None 

216 super(_OMD, self).__getitem__(key).append(value) 

217 

218 def setall(self, key: str, values: List[_T]) -> None: 

219 super(_OMD, self).__setitem__(key, values) 

220 

221 def __getitem__(self, key: str) -> Any: 

222 return super(_OMD, self).__getitem__(key)[-1] 

223 

224 def getlast(self, key: str) -> Any: 

225 return super(_OMD, self).__getitem__(key)[-1] 

226 

227 def setlast(self, key: str, value: Any) -> None: 

228 if key not in self: 

229 super(_OMD, self).__setitem__(key, [value]) 

230 return 

231 

232 prior = super(_OMD, self).__getitem__(key) 

233 prior[-1] = value 

234 

235 def get(self, key: str, default: Union[_T, None] = None) -> Union[_T, None]: 

236 return super(_OMD, self).get(key, [default])[-1] 

237 

238 def getall(self, key: str) -> List[_T]: 

239 return super(_OMD, self).__getitem__(key) 

240 

241 def items(self) -> List[Tuple[str, _T]]: # type: ignore[override] 

242 """List of (key, last value for key).""" 

243 return [(k, self[k]) for k in self] 

244 

245 def items_all(self) -> List[Tuple[str, List[_T]]]: 

246 """List of (key, list of values for key).""" 

247 return [(k, self.getall(k)) for k in self] 

248 

249 

250def get_config_path(config_level: Lit_config_levels) -> str: 

251 

252 # we do not support an absolute path of the gitconfig on windows , 

253 # use the global config instead 

254 if is_win and config_level == "system": 

255 config_level = "global" 

256 

257 if config_level == "system": 

258 return "/etc/gitconfig" 

259 elif config_level == "user": 

260 config_home = os.environ.get("XDG_CONFIG_HOME") or osp.join(os.environ.get("HOME", "~"), ".config") 

261 return osp.normpath(osp.expanduser(osp.join(config_home, "git", "config"))) 

262 elif config_level == "global": 

263 return osp.normpath(osp.expanduser("~/.gitconfig")) 

264 elif config_level == "repository": 

265 raise ValueError("No repo to get repository configuration from. Use Repo._get_config_path") 

266 else: 

267 # Should not reach here. Will raise ValueError if does. Static typing will warn missing elifs 

268 assert_never( 

269 config_level, # type: ignore[unreachable] 

270 ValueError(f"Invalid configuration level: {config_level!r}"), 

271 ) 

272 

273 

274class GitConfigParser(cp.RawConfigParser, metaclass=MetaParserBuilder): 

275 

276 """Implements specifics required to read git style configuration files. 

277 

278 This variation behaves much like the git.config command such that the configuration 

279 will be read on demand based on the filepath given during initialization. 

280 

281 The changes will automatically be written once the instance goes out of scope, but 

282 can be triggered manually as well. 

283 

284 The configuration file will be locked if you intend to change values preventing other 

285 instances to write concurrently. 

286 

287 :note: 

288 The config is case-sensitive even when queried, hence section and option names 

289 must match perfectly. 

290 If used as a context manager, will release the locked file.""" 

291 

292 # { Configuration 

293 # The lock type determines the type of lock to use in new configuration readers. 

294 # They must be compatible to the LockFile interface. 

295 # A suitable alternative would be the BlockingLockFile 

296 t_lock = LockFile 

297 re_comment = re.compile(r"^\s*[#;]") 

298 

299 # } END configuration 

300 

301 optvalueonly_source = r"\s*(?P<option>[^:=\s][^:=]*)" 

302 

303 OPTVALUEONLY = re.compile(optvalueonly_source) 

304 

305 OPTCRE = re.compile(optvalueonly_source + r"\s*(?P<vi>[:=])\s*" + r"(?P<value>.*)$") 

306 

307 del optvalueonly_source 

308 

309 # list of RawConfigParser methods able to change the instance 

310 _mutating_methods_ = ("add_section", "remove_section", "remove_option", "set") 

311 

312 def __init__( 

313 self, 

314 file_or_files: Union[None, PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = None, 

315 read_only: bool = True, 

316 merge_includes: bool = True, 

317 config_level: Union[Lit_config_levels, None] = None, 

318 repo: Union["Repo", None] = None, 

319 ) -> None: 

320 """Initialize a configuration reader to read the given file_or_files and to 

321 possibly allow changes to it by setting read_only False 

322 

323 :param file_or_files: 

324 A single file path or file objects or multiple of these 

325 

326 :param read_only: 

327 If True, the ConfigParser may only read the data , but not change it. 

328 If False, only a single file path or file object may be given. We will write back the changes 

329 when they happen, or when the ConfigParser is released. This will not happen if other 

330 configuration files have been included 

331 :param merge_includes: if True, we will read files mentioned in [include] sections and merge their 

332 contents into ours. This makes it impossible to write back an individual configuration file. 

333 Thus, if you want to modify a single configuration file, turn this off to leave the original 

334 dataset unaltered when reading it. 

335 :param repo: Reference to repository to use if [includeIf] sections are found in configuration files. 

336 

337 """ 

338 cp.RawConfigParser.__init__(self, dict_type=_OMD) 

339 self._dict: Callable[..., _OMD] # type: ignore # mypy/typeshed bug? 

340 self._defaults: _OMD 

341 self._sections: _OMD # type: ignore # mypy/typeshed bug? 

342 

343 # Used in python 3, needs to stay in sync with sections for underlying implementation to work 

344 if not hasattr(self, "_proxies"): 

345 self._proxies = self._dict() 

346 

347 if file_or_files is not None: 

348 self._file_or_files: Union[PathLike, "BytesIO", Sequence[Union[PathLike, "BytesIO"]]] = file_or_files 

349 else: 

350 if config_level is None: 

351 if read_only: 

352 self._file_or_files = [ 

353 get_config_path(cast(Lit_config_levels, f)) for f in CONFIG_LEVELS if f != "repository" 

354 ] 

355 else: 

356 raise ValueError("No configuration level or configuration files specified") 

357 else: 

358 self._file_or_files = [get_config_path(config_level)] 

359 

360 self._read_only = read_only 

361 self._dirty = False 

362 self._is_initialized = False 

363 self._merge_includes = merge_includes 

364 self._repo = repo 

365 self._lock: Union["LockFile", None] = None 

366 self._acquire_lock() 

367 

368 def _acquire_lock(self) -> None: 

369 if not self._read_only: 

370 if not self._lock: 

371 if isinstance(self._file_or_files, (str, os.PathLike)): 

372 file_or_files = self._file_or_files 

373 elif isinstance(self._file_or_files, (tuple, list, Sequence)): 

374 raise ValueError( 

375 "Write-ConfigParsers can operate on a single file only, multiple files have been passed" 

376 ) 

377 else: 

378 file_or_files = self._file_or_files.name 

379 

380 # END get filename from handle/stream 

381 # initialize lock base - we want to write 

382 self._lock = self.t_lock(file_or_files) 

383 # END lock check 

384 

385 self._lock._obtain_lock() 

386 # END read-only check 

387 

388 def __del__(self) -> None: 

389 """Write pending changes if required and release locks""" 

390 # NOTE: only consistent in PY2 

391 self.release() 

392 

393 def __enter__(self) -> "GitConfigParser": 

394 self._acquire_lock() 

395 return self 

396 

397 def __exit__(self, *args: Any) -> None: 

398 self.release() 

399 

400 def release(self) -> None: 

401 """Flush changes and release the configuration write lock. This instance must not be used anymore afterwards. 

402 In Python 3, it's required to explicitly release locks and flush changes, as __del__ is not called 

403 deterministically anymore.""" 

404 # checking for the lock here makes sure we do not raise during write() 

405 # in case an invalid parser was created who could not get a lock 

406 if self.read_only or (self._lock and not self._lock._has_lock()): 

407 return 

408 

409 try: 

410 try: 

411 self.write() 

412 except IOError: 

413 log.error("Exception during destruction of GitConfigParser", exc_info=True) 

414 except ReferenceError: 

415 # This happens in PY3 ... and usually means that some state cannot be written 

416 # as the sections dict cannot be iterated 

417 # Usually when shutting down the interpreter, don'y know how to fix this 

418 pass 

419 finally: 

420 if self._lock is not None: 

421 self._lock._release_lock() 

422 

423 def optionxform(self, optionstr: str) -> str: 

424 """Do not transform options in any way when writing""" 

425 return optionstr 

426 

427 def _read(self, fp: Union[BufferedReader, IO[bytes]], fpname: str) -> None: 

428 """A direct copy of the py2.4 version of the super class's _read method 

429 to assure it uses ordered dicts. Had to change one line to make it work. 

430 

431 Future versions have this fixed, but in fact its quite embarrassing for the 

432 guys not to have done it right in the first place ! 

433 

434 Removed big comments to make it more compact. 

435 

436 Made sure it ignores initial whitespace as git uses tabs""" 

437 cursect = None # None, or a dictionary 

438 optname = None 

439 lineno = 0 

440 is_multi_line = False 

441 e = None # None, or an exception 

442 

443 def string_decode(v: str) -> str: 

444 if v[-1] == "\\": 

445 v = v[:-1] 

446 # end cut trailing escapes to prevent decode error 

447 

448 return v.encode(defenc).decode("unicode_escape") 

449 # end 

450 

451 # end 

452 

453 while True: 

454 # we assume to read binary ! 

455 line = fp.readline().decode(defenc) 

456 if not line: 

457 break 

458 lineno = lineno + 1 

459 # comment or blank line? 

460 if line.strip() == "" or self.re_comment.match(line): 

461 continue 

462 if line.split(None, 1)[0].lower() == "rem" and line[0] in "rR": 

463 # no leading whitespace 

464 continue 

465 

466 # is it a section header? 

467 mo = self.SECTCRE.match(line.strip()) 

468 if not is_multi_line and mo: 

469 sectname: str = mo.group("header").strip() 

470 if sectname in self._sections: 

471 cursect = self._sections[sectname] 

472 elif sectname == cp.DEFAULTSECT: 

473 cursect = self._defaults 

474 else: 

475 cursect = self._dict((("__name__", sectname),)) 

476 self._sections[sectname] = cursect 

477 self._proxies[sectname] = None 

478 # So sections can't start with a continuation line 

479 optname = None 

480 # no section header in the file? 

481 elif cursect is None: 

482 raise cp.MissingSectionHeaderError(fpname, lineno, line) 

483 # an option line? 

484 elif not is_multi_line: 

485 mo = self.OPTCRE.match(line) 

486 if mo: 

487 # We might just have handled the last line, which could contain a quotation we want to remove 

488 optname, vi, optval = mo.group("option", "vi", "value") 

489 if vi in ("=", ":") and ";" in optval and not optval.strip().startswith('"'): 

490 pos = optval.find(";") 

491 if pos != -1 and optval[pos - 1].isspace(): 

492 optval = optval[:pos] 

493 optval = optval.strip() 

494 if optval == '""': 

495 optval = "" 

496 # end handle empty string 

497 optname = self.optionxform(optname.rstrip()) 

498 if len(optval) > 1 and optval[0] == '"' and optval[-1] != '"': 

499 is_multi_line = True 

500 optval = string_decode(optval[1:]) 

501 # end handle multi-line 

502 # preserves multiple values for duplicate optnames 

503 cursect.add(optname, optval) 

504 else: 

505 # check if it's an option with no value - it's just ignored by git 

506 if not self.OPTVALUEONLY.match(line): 

507 if not e: 

508 e = cp.ParsingError(fpname) 

509 e.append(lineno, repr(line)) 

510 continue 

511 else: 

512 line = line.rstrip() 

513 if line.endswith('"'): 

514 is_multi_line = False 

515 line = line[:-1] 

516 # end handle quotations 

517 optval = cursect.getlast(optname) 

518 cursect.setlast(optname, optval + string_decode(line)) 

519 # END parse section or option 

520 # END while reading 

521 

522 # if any parsing errors occurred, raise an exception 

523 if e: 

524 raise e 

525 

526 def _has_includes(self) -> Union[bool, int]: 

527 return self._merge_includes and len(self._included_paths()) 

528 

529 def _included_paths(self) -> List[Tuple[str, str]]: 

530 """Return List all paths that must be included to configuration 

531 as Tuples of (option, value). 

532 """ 

533 paths = [] 

534 

535 for section in self.sections(): 

536 if section == "include": 

537 paths += self.items(section) 

538 

539 match = CONDITIONAL_INCLUDE_REGEXP.search(section) 

540 if match is None or self._repo is None: 

541 continue 

542 

543 keyword = match.group(1) 

544 value = match.group(2).strip() 

545 

546 if keyword in ["gitdir", "gitdir/i"]: 

547 value = osp.expanduser(value) 

548 

549 if not any(value.startswith(s) for s in ["./", "/"]): 

550 value = "**/" + value 

551 if value.endswith("/"): 

552 value += "**" 

553 

554 # Ensure that glob is always case insensitive if required. 

555 if keyword.endswith("/i"): 

556 value = re.sub( 

557 r"[a-zA-Z]", 

558 lambda m: "[{}{}]".format(m.group().lower(), m.group().upper()), 

559 value, 

560 ) 

561 if self._repo.git_dir: 

562 if fnmatch.fnmatchcase(str(self._repo.git_dir), value): 

563 paths += self.items(section) 

564 

565 elif keyword == "onbranch": 

566 try: 

567 branch_name = self._repo.active_branch.name 

568 except TypeError: 

569 # Ignore section if active branch cannot be retrieved. 

570 continue 

571 

572 if fnmatch.fnmatchcase(branch_name, value): 

573 paths += self.items(section) 

574 

575 return paths 

576 

577 def read(self) -> None: # type: ignore[override] 

578 """Reads the data stored in the files we have been initialized with. It will 

579 ignore files that cannot be read, possibly leaving an empty configuration 

580 

581 :return: Nothing 

582 :raise IOError: if a file cannot be handled""" 

583 if self._is_initialized: 

584 return None 

585 self._is_initialized = True 

586 

587 files_to_read: List[Union[PathLike, IO]] = [""] 

588 if isinstance(self._file_or_files, (str, os.PathLike)): 

589 # for str or Path, as str is a type of Sequence 

590 files_to_read = [self._file_or_files] 

591 elif not isinstance(self._file_or_files, (tuple, list, Sequence)): 

592 # could merge with above isinstance once runtime type known 

593 files_to_read = [self._file_or_files] 

594 else: # for lists or tuples 

595 files_to_read = list(self._file_or_files) 

596 # end assure we have a copy of the paths to handle 

597 

598 seen = set(files_to_read) 

599 num_read_include_files = 0 

600 while files_to_read: 

601 file_path = files_to_read.pop(0) 

602 file_ok = False 

603 

604 if hasattr(file_path, "seek"): 

605 # must be a file objectfile-object 

606 file_path = cast(IO[bytes], file_path) # replace with assert to narrow type, once sure 

607 self._read(file_path, file_path.name) 

608 else: 

609 # assume a path if it is not a file-object 

610 file_path = cast(PathLike, file_path) 

611 try: 

612 with open(file_path, "rb") as fp: 

613 file_ok = True 

614 self._read(fp, fp.name) 

615 except IOError: 

616 continue 

617 

618 # Read includes and append those that we didn't handle yet 

619 # We expect all paths to be normalized and absolute (and will assure that is the case) 

620 if self._has_includes(): 

621 for _, include_path in self._included_paths(): 

622 if include_path.startswith("~"): 

623 include_path = osp.expanduser(include_path) 

624 if not osp.isabs(include_path): 

625 if not file_ok: 

626 continue 

627 # end ignore relative paths if we don't know the configuration file path 

628 file_path = cast(PathLike, file_path) 

629 assert osp.isabs(file_path), "Need absolute paths to be sure our cycle checks will work" 

630 include_path = osp.join(osp.dirname(file_path), include_path) 

631 # end make include path absolute 

632 include_path = osp.normpath(include_path) 

633 if include_path in seen or not os.access(include_path, os.R_OK): 

634 continue 

635 seen.add(include_path) 

636 # insert included file to the top to be considered first 

637 files_to_read.insert(0, include_path) 

638 num_read_include_files += 1 

639 # each include path in configuration file 

640 # end handle includes 

641 # END for each file object to read 

642 

643 # If there was no file included, we can safely write back (potentially) the configuration file 

644 # without altering it's meaning 

645 if num_read_include_files == 0: 

646 self._merge_includes = False 

647 # end 

648 

649 def _write(self, fp: IO) -> None: 

650 """Write an .ini-format representation of the configuration state in 

651 git compatible format""" 

652 

653 def write_section(name: str, section_dict: _OMD) -> None: 

654 fp.write(("[%s]\n" % name).encode(defenc)) 

655 

656 values: Sequence[str] # runtime only gets str in tests, but should be whatever _OMD stores 

657 v: str 

658 for (key, values) in section_dict.items_all(): 

659 if key == "__name__": 

660 continue 

661 

662 for v in values: 

663 fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace("\n", "\n\t"))).encode(defenc)) 

664 # END if key is not __name__ 

665 

666 # END section writing 

667 

668 if self._defaults: 

669 write_section(cp.DEFAULTSECT, self._defaults) 

670 value: _OMD 

671 

672 for name, value in self._sections.items(): 

673 write_section(name, value) 

674 

675 def items(self, section_name: str) -> List[Tuple[str, str]]: # type: ignore[override] 

676 """:return: list((option, value), ...) pairs of all items in the given section""" 

677 return [(k, v) for k, v in super(GitConfigParser, self).items(section_name) if k != "__name__"] 

678 

679 def items_all(self, section_name: str) -> List[Tuple[str, List[str]]]: 

680 """:return: list((option, [values...]), ...) pairs of all items in the given section""" 

681 rv = _OMD(self._defaults) 

682 

683 for k, vs in self._sections[section_name].items_all(): 

684 if k == "__name__": 

685 continue 

686 

687 if k in rv and rv.getall(k) == vs: 

688 continue 

689 

690 for v in vs: 

691 rv.add(k, v) 

692 

693 return rv.items_all() 

694 

695 @needs_values 

696 def write(self) -> None: 

697 """Write changes to our file, if there are changes at all 

698 

699 :raise IOError: if this is a read-only writer instance or if we could not obtain 

700 a file lock""" 

701 self._assure_writable("write") 

702 if not self._dirty: 

703 return None 

704 

705 if isinstance(self._file_or_files, (list, tuple)): 

706 raise AssertionError( 

707 "Cannot write back if there is not exactly a single file to write to, have %i files" 

708 % len(self._file_or_files) 

709 ) 

710 # end assert multiple files 

711 

712 if self._has_includes(): 

713 log.debug( 

714 "Skipping write-back of configuration file as include files were merged in." 

715 + "Set merge_includes=False to prevent this." 

716 ) 

717 return None 

718 # end 

719 

720 fp = self._file_or_files 

721 

722 # we have a physical file on disk, so get a lock 

723 is_file_lock = isinstance(fp, (str, os.PathLike, IOBase)) # can't use Pathlike until 3.5 dropped 

724 if is_file_lock and self._lock is not None: # else raise Error? 

725 self._lock._obtain_lock() 

726 

727 if not hasattr(fp, "seek"): 

728 fp = cast(PathLike, fp) 

729 with open(fp, "wb") as fp_open: 

730 self._write(fp_open) 

731 else: 

732 fp = cast("BytesIO", fp) 

733 fp.seek(0) 

734 # make sure we do not overwrite into an existing file 

735 if hasattr(fp, "truncate"): 

736 fp.truncate() 

737 self._write(fp) 

738 

739 def _assure_writable(self, method_name: str) -> None: 

740 if self.read_only: 

741 raise IOError("Cannot execute non-constant method %s.%s" % (self, method_name)) 

742 

743 def add_section(self, section: str) -> None: 

744 """Assures added options will stay in order""" 

745 return super(GitConfigParser, self).add_section(section) 

746 

747 @property 

748 def read_only(self) -> bool: 

749 """:return: True if this instance may change the configuration file""" 

750 return self._read_only 

751 

752 def get_value( 

753 self, 

754 section: str, 

755 option: str, 

756 default: Union[int, float, str, bool, None] = None, 

757 ) -> Union[int, float, str, bool]: 

758 # can default or return type include bool? 

759 """Get an option's value. 

760 

761 If multiple values are specified for this option in the section, the 

762 last one specified is returned. 

763 

764 :param default: 

765 If not None, the given default value will be returned in case 

766 the option did not exist 

767 :return: a properly typed value, either int, float or string 

768 

769 :raise TypeError: in case the value could not be understood 

770 Otherwise the exceptions known to the ConfigParser will be raised.""" 

771 try: 

772 valuestr = self.get(section, option) 

773 except Exception: 

774 if default is not None: 

775 return default 

776 raise 

777 

778 return self._string_to_value(valuestr) 

779 

780 def get_values( 

781 self, 

782 section: str, 

783 option: str, 

784 default: Union[int, float, str, bool, None] = None, 

785 ) -> List[Union[int, float, str, bool]]: 

786 """Get an option's values. 

787 

788 If multiple values are specified for this option in the section, all are 

789 returned. 

790 

791 :param default: 

792 If not None, a list containing the given default value will be 

793 returned in case the option did not exist 

794 :return: a list of properly typed values, either int, float or string 

795 

796 :raise TypeError: in case the value could not be understood 

797 Otherwise the exceptions known to the ConfigParser will be raised.""" 

798 try: 

799 lst = self._sections[section].getall(option) 

800 except Exception: 

801 if default is not None: 

802 return [default] 

803 raise 

804 

805 return [self._string_to_value(valuestr) for valuestr in lst] 

806 

807 def _string_to_value(self, valuestr: str) -> Union[int, float, str, bool]: 

808 types = (int, float) 

809 for numtype in types: 

810 try: 

811 val = numtype(valuestr) 

812 # truncated value ? 

813 if val != float(valuestr): 

814 continue 

815 return val 

816 except (ValueError, TypeError): 

817 continue 

818 # END for each numeric type 

819 

820 # try boolean values as git uses them 

821 vl = valuestr.lower() 

822 if vl == "false": 

823 return False 

824 if vl == "true": 

825 return True 

826 

827 if not isinstance(valuestr, str): 

828 raise TypeError( 

829 "Invalid value type: only int, long, float and str are allowed", 

830 valuestr, 

831 ) 

832 

833 return valuestr 

834 

835 def _value_to_string(self, value: Union[str, bytes, int, float, bool]) -> str: 

836 if isinstance(value, (int, float, bool)): 

837 return str(value) 

838 return force_text(value) 

839 

840 @needs_values 

841 @set_dirty_and_flush_changes 

842 def set_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": 

843 """Sets the given option in section to the given value. 

844 It will create the section if required, and will not throw as opposed to the default 

845 ConfigParser 'set' method. 

846 

847 :param section: Name of the section in which the option resides or should reside 

848 :param option: Name of the options whose value to set 

849 

850 :param value: Value to set the option to. It must be a string or convertible 

851 to a string 

852 :return: this instance""" 

853 if not self.has_section(section): 

854 self.add_section(section) 

855 self.set(section, option, self._value_to_string(value)) 

856 return self 

857 

858 @needs_values 

859 @set_dirty_and_flush_changes 

860 def add_value(self, section: str, option: str, value: Union[str, bytes, int, float, bool]) -> "GitConfigParser": 

861 """Adds a value for the given option in section. 

862 It will create the section if required, and will not throw as opposed to the default 

863 ConfigParser 'set' method. The value becomes the new value of the option as returned 

864 by 'get_value', and appends to the list of values returned by 'get_values`'. 

865 

866 :param section: Name of the section in which the option resides or should reside 

867 :param option: Name of the option 

868 

869 :param value: Value to add to option. It must be a string or convertible 

870 to a string 

871 :return: this instance""" 

872 if not self.has_section(section): 

873 self.add_section(section) 

874 self._sections[section].add(option, self._value_to_string(value)) 

875 return self 

876 

877 def rename_section(self, section: str, new_name: str) -> "GitConfigParser": 

878 """rename the given section to new_name 

879 :raise ValueError: if section doesn't exit 

880 :raise ValueError: if a section with new_name does already exist 

881 :return: this instance 

882 """ 

883 if not self.has_section(section): 

884 raise ValueError("Source section '%s' doesn't exist" % section) 

885 if self.has_section(new_name): 

886 raise ValueError("Destination section '%s' already exists" % new_name) 

887 

888 super(GitConfigParser, self).add_section(new_name) 

889 new_section = self._sections[new_name] 

890 for k, vs in self.items_all(section): 

891 new_section.setall(k, vs) 

892 # end for each value to copy 

893 

894 # This call writes back the changes, which is why we don't have the respective decorator 

895 self.remove_section(section) 

896 return self