Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/psycopg2/extras.py: 21%

644 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1"""Miscellaneous goodies for psycopg2 

2 

3This module is a generic place used to hold little helper functions 

4and classes until a better place in the distribution is found. 

5""" 

6# psycopg/extras.py - miscellaneous extra goodies for psycopg 

7# 

8# Copyright (C) 2003-2019 Federico Di Gregorio <fog@debian.org> 

9# Copyright (C) 2020-2021 The Psycopg Team 

10# 

11# psycopg2 is free software: you can redistribute it and/or modify it 

12# under the terms of the GNU Lesser General Public License as published 

13# by the Free Software Foundation, either version 3 of the License, or 

14# (at your option) any later version. 

15# 

16# In addition, as a special exception, the copyright holders give 

17# permission to link this program with the OpenSSL library (or with 

18# modified versions of OpenSSL that use the same license as OpenSSL), 

19# and distribute linked combinations including the two. 

20# 

21# You must obey the GNU Lesser General Public License in all respects for 

22# all of the code used other than OpenSSL. 

23# 

24# psycopg2 is distributed in the hope that it will be useful, but WITHOUT 

25# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 

26# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public 

27# License for more details. 

28 

29import os as _os 

30import time as _time 

31import re as _re 

32from collections import namedtuple, OrderedDict 

33 

34import logging as _logging 

35 

36import psycopg2 

37from psycopg2 import extensions as _ext 

38from .extensions import cursor as _cursor 

39from .extensions import connection as _connection 

40from .extensions import adapt as _A, quote_ident 

41from functools import lru_cache 

42 

43from psycopg2._psycopg import ( # noqa 

44 REPLICATION_PHYSICAL, REPLICATION_LOGICAL, 

45 ReplicationConnection as _replicationConnection, 

46 ReplicationCursor as _replicationCursor, 

47 ReplicationMessage) 

48 

49 

50# expose the json adaptation stuff into the module 

51from psycopg2._json import ( # noqa 

52 json, Json, register_json, register_default_json, register_default_jsonb) 

53 

54 

55# Expose range-related objects 

56from psycopg2._range import ( # noqa 

57 Range, NumericRange, DateRange, DateTimeRange, DateTimeTZRange, 

58 register_range, RangeAdapter, RangeCaster) 

59 

60 

61# Expose ipaddress-related objects 

62from psycopg2._ipaddress import register_ipaddress # noqa 

63 

64 

65class DictCursorBase(_cursor): 

66 """Base class for all dict-like cursors.""" 

67 

68 def __init__(self, *args, **kwargs): 

69 if 'row_factory' in kwargs: 

70 row_factory = kwargs['row_factory'] 

71 del kwargs['row_factory'] 

72 else: 

73 raise NotImplementedError( 

74 "DictCursorBase can't be instantiated without a row factory.") 

75 super().__init__(*args, **kwargs) 

76 self._query_executed = False 

77 self._prefetch = False 

78 self.row_factory = row_factory 

79 

80 def fetchone(self): 

81 if self._prefetch: 

82 res = super().fetchone() 

83 if self._query_executed: 

84 self._build_index() 

85 if not self._prefetch: 

86 res = super().fetchone() 

87 return res 

88 

89 def fetchmany(self, size=None): 

90 if self._prefetch: 

91 res = super().fetchmany(size) 

92 if self._query_executed: 

93 self._build_index() 

94 if not self._prefetch: 

95 res = super().fetchmany(size) 

96 return res 

97 

98 def fetchall(self): 

99 if self._prefetch: 

100 res = super().fetchall() 

101 if self._query_executed: 

102 self._build_index() 

103 if not self._prefetch: 

104 res = super().fetchall() 

105 return res 

106 

107 def __iter__(self): 

108 try: 

109 if self._prefetch: 

110 res = super().__iter__() 

111 first = next(res) 

112 if self._query_executed: 

113 self._build_index() 

114 if not self._prefetch: 

115 res = super().__iter__() 

116 first = next(res) 

117 

118 yield first 

119 while True: 

120 yield next(res) 

121 except StopIteration: 

122 return 

123 

124 

125class DictConnection(_connection): 

126 """A connection that uses `DictCursor` automatically.""" 

127 def cursor(self, *args, **kwargs): 

128 kwargs.setdefault('cursor_factory', self.cursor_factory or DictCursor) 

129 return super().cursor(*args, **kwargs) 

130 

131 

132class DictCursor(DictCursorBase): 

133 """A cursor that keeps a list of column name -> index mappings__. 

134 

135 .. __: https://docs.python.org/glossary.html#term-mapping 

136 """ 

137 

138 def __init__(self, *args, **kwargs): 

139 kwargs['row_factory'] = DictRow 

140 super().__init__(*args, **kwargs) 

141 self._prefetch = True 

142 

143 def execute(self, query, vars=None): 

144 self.index = OrderedDict() 

145 self._query_executed = True 

146 return super().execute(query, vars) 

147 

148 def callproc(self, procname, vars=None): 

149 self.index = OrderedDict() 

150 self._query_executed = True 

151 return super().callproc(procname, vars) 

152 

153 def _build_index(self): 

154 if self._query_executed and self.description: 

155 for i in range(len(self.description)): 

156 self.index[self.description[i][0]] = i 

157 self._query_executed = False 

158 

159 

160class DictRow(list): 

161 """A row object that allow by-column-name access to data.""" 

162 

163 __slots__ = ('_index',) 

164 

165 def __init__(self, cursor): 

166 self._index = cursor.index 

167 self[:] = [None] * len(cursor.description) 

168 

169 def __getitem__(self, x): 

170 if not isinstance(x, (int, slice)): 

171 x = self._index[x] 

172 return super().__getitem__(x) 

173 

174 def __setitem__(self, x, v): 

175 if not isinstance(x, (int, slice)): 

176 x = self._index[x] 

177 super().__setitem__(x, v) 

178 

179 def items(self): 

180 g = super().__getitem__ 

181 return ((n, g(self._index[n])) for n in self._index) 

182 

183 def keys(self): 

184 return iter(self._index) 

185 

186 def values(self): 

187 g = super().__getitem__ 

188 return (g(self._index[n]) for n in self._index) 

189 

190 def get(self, x, default=None): 

191 try: 

192 return self[x] 

193 except Exception: 

194 return default 

195 

196 def copy(self): 

197 return OrderedDict(self.items()) 

198 

199 def __contains__(self, x): 

200 return x in self._index 

201 

202 def __reduce__(self): 

203 # this is apparently useless, but it fixes #1073 

204 return super().__reduce__() 

205 

206 def __getstate__(self): 

207 return self[:], self._index.copy() 

208 

209 def __setstate__(self, data): 

210 self[:] = data[0] 

211 self._index = data[1] 

212 

213 

214class RealDictConnection(_connection): 

215 """A connection that uses `RealDictCursor` automatically.""" 

216 def cursor(self, *args, **kwargs): 

217 kwargs.setdefault('cursor_factory', self.cursor_factory or RealDictCursor) 

218 return super().cursor(*args, **kwargs) 

219 

220 

221class RealDictCursor(DictCursorBase): 

222 """A cursor that uses a real dict as the base type for rows. 

223 

224 Note that this cursor is extremely specialized and does not allow 

225 the normal access (using integer indices) to fetched data. If you need 

226 to access database rows both as a dictionary and a list, then use 

227 the generic `DictCursor` instead of `!RealDictCursor`. 

228 """ 

229 def __init__(self, *args, **kwargs): 

230 kwargs['row_factory'] = RealDictRow 

231 super().__init__(*args, **kwargs) 

232 

233 def execute(self, query, vars=None): 

234 self.column_mapping = [] 

235 self._query_executed = True 

236 return super().execute(query, vars) 

237 

238 def callproc(self, procname, vars=None): 

239 self.column_mapping = [] 

240 self._query_executed = True 

241 return super().callproc(procname, vars) 

242 

243 def _build_index(self): 

244 if self._query_executed and self.description: 

245 self.column_mapping = [d[0] for d in self.description] 

246 self._query_executed = False 

247 

248 

249class RealDictRow(OrderedDict): 

250 """A `!dict` subclass representing a data record.""" 

251 

252 def __init__(self, *args, **kwargs): 

253 if args and isinstance(args[0], _cursor): 

254 cursor = args[0] 

255 args = args[1:] 

256 else: 

257 cursor = None 

258 

259 super().__init__(*args, **kwargs) 

260 

261 if cursor is not None: 

262 # Required for named cursors 

263 if cursor.description and not cursor.column_mapping: 

264 cursor._build_index() 

265 

266 # Store the cols mapping in the dict itself until the row is fully 

267 # populated, so we don't need to add attributes to the class 

268 # (hence keeping its maintenance, special pickle support, etc.) 

269 self[RealDictRow] = cursor.column_mapping 

270 

271 def __setitem__(self, key, value): 

272 if RealDictRow in self: 

273 # We are in the row building phase 

274 mapping = self[RealDictRow] 

275 super().__setitem__(mapping[key], value) 

276 if key == len(mapping) - 1: 

277 # Row building finished 

278 del self[RealDictRow] 

279 return 

280 

281 super().__setitem__(key, value) 

282 

283 

284class NamedTupleConnection(_connection): 

285 """A connection that uses `NamedTupleCursor` automatically.""" 

286 def cursor(self, *args, **kwargs): 

287 kwargs.setdefault('cursor_factory', self.cursor_factory or NamedTupleCursor) 

288 return super().cursor(*args, **kwargs) 

289 

290 

291class NamedTupleCursor(_cursor): 

292 """A cursor that generates results as `~collections.namedtuple`. 

293 

294 `!fetch*()` methods will return named tuples instead of regular tuples, so 

295 their elements can be accessed both as regular numeric items as well as 

296 attributes. 

297 

298 >>> nt_cur = conn.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) 

299 >>> rec = nt_cur.fetchone() 

300 >>> rec 

301 Record(id=1, num=100, data="abc'def") 

302 >>> rec[1] 

303 100 

304 >>> rec.data 

305 "abc'def" 

306 """ 

307 Record = None 

308 MAX_CACHE = 1024 

309 

310 def execute(self, query, vars=None): 

311 self.Record = None 

312 return super().execute(query, vars) 

313 

314 def executemany(self, query, vars): 

315 self.Record = None 

316 return super().executemany(query, vars) 

317 

318 def callproc(self, procname, vars=None): 

319 self.Record = None 

320 return super().callproc(procname, vars) 

321 

322 def fetchone(self): 

323 t = super().fetchone() 

324 if t is not None: 

325 nt = self.Record 

326 if nt is None: 

327 nt = self.Record = self._make_nt() 

328 return nt._make(t) 

329 

330 def fetchmany(self, size=None): 

331 ts = super().fetchmany(size) 

332 nt = self.Record 

333 if nt is None: 

334 nt = self.Record = self._make_nt() 

335 return list(map(nt._make, ts)) 

336 

337 def fetchall(self): 

338 ts = super().fetchall() 

339 nt = self.Record 

340 if nt is None: 

341 nt = self.Record = self._make_nt() 

342 return list(map(nt._make, ts)) 

343 

344 def __iter__(self): 

345 try: 

346 it = super().__iter__() 

347 t = next(it) 

348 

349 nt = self.Record 

350 if nt is None: 

351 nt = self.Record = self._make_nt() 

352 

353 yield nt._make(t) 

354 

355 while True: 

356 yield nt._make(next(it)) 

357 except StopIteration: 

358 return 

359 

360 def _make_nt(self): 

361 key = tuple(d[0] for d in self.description) if self.description else () 

362 return self._cached_make_nt(key) 

363 

364 @classmethod 

365 def _do_make_nt(cls, key): 

366 fields = [] 

367 for s in key: 

368 s = _re_clean.sub('_', s) 

369 # Python identifier cannot start with numbers, namedtuple fields 

370 # cannot start with underscore. So... 

371 if s[0] == '_' or '0' <= s[0] <= '9': 

372 s = 'f' + s 

373 fields.append(s) 

374 

375 nt = namedtuple("Record", fields) 

376 return nt 

377 

378 

379@lru_cache(512) 

380def _cached_make_nt(cls, key): 

381 return cls._do_make_nt(key) 

382 

383 

384# Exposed for testability, and if someone wants to monkeypatch to tweak 

385# the cache size. 

386NamedTupleCursor._cached_make_nt = classmethod(_cached_make_nt) 

387 

388 

389class LoggingConnection(_connection): 

390 """A connection that logs all queries to a file or logger__ object. 

391 

392 .. __: https://docs.python.org/library/logging.html 

393 """ 

394 

395 def initialize(self, logobj): 

396 """Initialize the connection to log to `!logobj`. 

397 

398 The `!logobj` parameter can be an open file object or a Logger/LoggerAdapter 

399 instance from the standard logging module. 

400 """ 

401 self._logobj = logobj 

402 if _logging and isinstance( 

403 logobj, (_logging.Logger, _logging.LoggerAdapter)): 

404 self.log = self._logtologger 

405 else: 

406 self.log = self._logtofile 

407 

408 def filter(self, msg, curs): 

409 """Filter the query before logging it. 

410 

411 This is the method to overwrite to filter unwanted queries out of the 

412 log or to add some extra data to the output. The default implementation 

413 just does nothing. 

414 """ 

415 return msg 

416 

417 def _logtofile(self, msg, curs): 

418 msg = self.filter(msg, curs) 

419 if msg: 

420 if isinstance(msg, bytes): 

421 msg = msg.decode(_ext.encodings[self.encoding], 'replace') 

422 self._logobj.write(msg + _os.linesep) 

423 

424 def _logtologger(self, msg, curs): 

425 msg = self.filter(msg, curs) 

426 if msg: 

427 self._logobj.debug(msg) 

428 

429 def _check(self): 

430 if not hasattr(self, '_logobj'): 

431 raise self.ProgrammingError( 

432 "LoggingConnection object has not been initialize()d") 

433 

434 def cursor(self, *args, **kwargs): 

435 self._check() 

436 kwargs.setdefault('cursor_factory', self.cursor_factory or LoggingCursor) 

437 return super().cursor(*args, **kwargs) 

438 

439 

440class LoggingCursor(_cursor): 

441 """A cursor that logs queries using its connection logging facilities.""" 

442 

443 def execute(self, query, vars=None): 

444 try: 

445 return super().execute(query, vars) 

446 finally: 

447 self.connection.log(self.query, self) 

448 

449 def callproc(self, procname, vars=None): 

450 try: 

451 return super().callproc(procname, vars) 

452 finally: 

453 self.connection.log(self.query, self) 

454 

455 

456class MinTimeLoggingConnection(LoggingConnection): 

457 """A connection that logs queries based on execution time. 

458 

459 This is just an example of how to sub-class `LoggingConnection` to 

460 provide some extra filtering for the logged queries. Both the 

461 `initialize()` and `filter()` methods are overwritten to make sure 

462 that only queries executing for more than ``mintime`` ms are logged. 

463 

464 Note that this connection uses the specialized cursor 

465 `MinTimeLoggingCursor`. 

466 """ 

467 def initialize(self, logobj, mintime=0): 

468 LoggingConnection.initialize(self, logobj) 

469 self._mintime = mintime 

470 

471 def filter(self, msg, curs): 

472 t = (_time.time() - curs.timestamp) * 1000 

473 if t > self._mintime: 

474 if isinstance(msg, bytes): 

475 msg = msg.decode(_ext.encodings[self.encoding], 'replace') 

476 return f"{msg}{_os.linesep} (execution time: {t} ms)" 

477 

478 def cursor(self, *args, **kwargs): 

479 kwargs.setdefault('cursor_factory', 

480 self.cursor_factory or MinTimeLoggingCursor) 

481 return LoggingConnection.cursor(self, *args, **kwargs) 

482 

483 

484class MinTimeLoggingCursor(LoggingCursor): 

485 """The cursor sub-class companion to `MinTimeLoggingConnection`.""" 

486 

487 def execute(self, query, vars=None): 

488 self.timestamp = _time.time() 

489 return LoggingCursor.execute(self, query, vars) 

490 

491 def callproc(self, procname, vars=None): 

492 self.timestamp = _time.time() 

493 return LoggingCursor.callproc(self, procname, vars) 

494 

495 

496class LogicalReplicationConnection(_replicationConnection): 

497 

498 def __init__(self, *args, **kwargs): 

499 kwargs['replication_type'] = REPLICATION_LOGICAL 

500 super().__init__(*args, **kwargs) 

501 

502 

503class PhysicalReplicationConnection(_replicationConnection): 

504 

505 def __init__(self, *args, **kwargs): 

506 kwargs['replication_type'] = REPLICATION_PHYSICAL 

507 super().__init__(*args, **kwargs) 

508 

509 

510class StopReplication(Exception): 

511 """ 

512 Exception used to break out of the endless loop in 

513 `~ReplicationCursor.consume_stream()`. 

514 

515 Subclass of `~exceptions.Exception`. Intentionally *not* inherited from 

516 `~psycopg2.Error` as occurrence of this exception does not indicate an 

517 error. 

518 """ 

519 pass 

520 

521 

522class ReplicationCursor(_replicationCursor): 

523 """A cursor used for communication on replication connections.""" 

524 

525 def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): 

526 """Create streaming replication slot.""" 

527 

528 command = f"CREATE_REPLICATION_SLOT {quote_ident(slot_name, self)} " 

529 

530 if slot_type is None: 

531 slot_type = self.connection.replication_type 

532 

533 if slot_type == REPLICATION_LOGICAL: 

534 if output_plugin is None: 

535 raise psycopg2.ProgrammingError( 

536 "output plugin name is required to create " 

537 "logical replication slot") 

538 

539 command += f"LOGICAL {quote_ident(output_plugin, self)}" 

540 

541 elif slot_type == REPLICATION_PHYSICAL: 

542 if output_plugin is not None: 

543 raise psycopg2.ProgrammingError( 

544 "cannot specify output plugin name when creating " 

545 "physical replication slot") 

546 

547 command += "PHYSICAL" 

548 

549 else: 

550 raise psycopg2.ProgrammingError( 

551 f"unrecognized replication type: {repr(slot_type)}") 

552 

553 self.execute(command) 

554 

555 def drop_replication_slot(self, slot_name): 

556 """Drop streaming replication slot.""" 

557 

558 command = f"DROP_REPLICATION_SLOT {quote_ident(slot_name, self)}" 

559 self.execute(command) 

560 

561 def start_replication( 

562 self, slot_name=None, slot_type=None, start_lsn=0, 

563 timeline=0, options=None, decode=False, status_interval=10): 

564 """Start replication stream.""" 

565 

566 command = "START_REPLICATION " 

567 

568 if slot_type is None: 

569 slot_type = self.connection.replication_type 

570 

571 if slot_type == REPLICATION_LOGICAL: 

572 if slot_name: 

573 command += f"SLOT {quote_ident(slot_name, self)} " 

574 else: 

575 raise psycopg2.ProgrammingError( 

576 "slot name is required for logical replication") 

577 

578 command += "LOGICAL " 

579 

580 elif slot_type == REPLICATION_PHYSICAL: 

581 if slot_name: 

582 command += f"SLOT {quote_ident(slot_name, self)} " 

583 # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX 

584 

585 else: 

586 raise psycopg2.ProgrammingError( 

587 f"unrecognized replication type: {repr(slot_type)}") 

588 

589 if type(start_lsn) is str: 

590 lsn = start_lsn.split('/') 

591 lsn = f"{int(lsn[0], 16):X}/{int(lsn[1], 16):08X}" 

592 else: 

593 lsn = f"{start_lsn >> 32 & 4294967295:X}/{start_lsn & 4294967295:08X}" 

594 

595 command += lsn 

596 

597 if timeline != 0: 

598 if slot_type == REPLICATION_LOGICAL: 

599 raise psycopg2.ProgrammingError( 

600 "cannot specify timeline for logical replication") 

601 

602 command += f" TIMELINE {timeline}" 

603 

604 if options: 

605 if slot_type == REPLICATION_PHYSICAL: 

606 raise psycopg2.ProgrammingError( 

607 "cannot specify output plugin options for physical replication") 

608 

609 command += " (" 

610 for k, v in options.items(): 

611 if not command.endswith('('): 

612 command += ", " 

613 command += f"{quote_ident(k, self)} {_A(str(v))}" 

614 command += ")" 

615 

616 self.start_replication_expert( 

617 command, decode=decode, status_interval=status_interval) 

618 

619 # allows replication cursors to be used in select.select() directly 

620 def fileno(self): 

621 return self.connection.fileno() 

622 

623 

624# a dbtype and adapter for Python UUID type 

625 

626class UUID_adapter: 

627 """Adapt Python's uuid.UUID__ type to PostgreSQL's uuid__. 

628 

629 .. __: https://docs.python.org/library/uuid.html 

630 .. __: https://www.postgresql.org/docs/current/static/datatype-uuid.html 

631 """ 

632 

633 def __init__(self, uuid): 

634 self._uuid = uuid 

635 

636 def __conform__(self, proto): 

637 if proto is _ext.ISQLQuote: 

638 return self 

639 

640 def getquoted(self): 

641 return (f"'{self._uuid}'::uuid").encode('utf8') 

642 

643 def __str__(self): 

644 return f"'{self._uuid}'::uuid" 

645 

646 

647def register_uuid(oids=None, conn_or_curs=None): 

648 """Create the UUID type and an uuid.UUID adapter. 

649 

650 :param oids: oid for the PostgreSQL :sql:`uuid` type, or 2-items sequence 

651 with oids of the type and the array. If not specified, use PostgreSQL 

652 standard oids. 

653 :param conn_or_curs: where to register the typecaster. If not specified, 

654 register it globally. 

655 """ 

656 

657 import uuid 

658 

659 if not oids: 659 ↛ 662line 659 didn't jump to line 662, because the condition on line 659 was never false

660 oid1 = 2950 

661 oid2 = 2951 

662 elif isinstance(oids, (list, tuple)): 

663 oid1, oid2 = oids 

664 else: 

665 oid1 = oids 

666 oid2 = 2951 

667 

668 _ext.UUID = _ext.new_type((oid1, ), "UUID", 

669 lambda data, cursor: data and uuid.UUID(data) or None) 

670 _ext.UUIDARRAY = _ext.new_array_type((oid2,), "UUID[]", _ext.UUID) 

671 

672 _ext.register_type(_ext.UUID, conn_or_curs) 

673 _ext.register_type(_ext.UUIDARRAY, conn_or_curs) 

674 _ext.register_adapter(uuid.UUID, UUID_adapter) 

675 

676 return _ext.UUID 

677 

678 

679# a type, dbtype and adapter for PostgreSQL inet type 

680 

681class Inet: 

682 """Wrap a string to allow for correct SQL-quoting of inet values. 

683 

684 Note that this adapter does NOT check the passed value to make 

685 sure it really is an inet-compatible address but DOES call adapt() 

686 on it to make sure it is impossible to execute an SQL-injection 

687 by passing an evil value to the initializer. 

688 """ 

689 def __init__(self, addr): 

690 self.addr = addr 

691 

692 def __repr__(self): 

693 return f"{self.__class__.__name__}({self.addr!r})" 

694 

695 def prepare(self, conn): 

696 self._conn = conn 

697 

698 def getquoted(self): 

699 obj = _A(self.addr) 

700 if hasattr(obj, 'prepare'): 

701 obj.prepare(self._conn) 

702 return obj.getquoted() + b"::inet" 

703 

704 def __conform__(self, proto): 

705 if proto is _ext.ISQLQuote: 

706 return self 

707 

708 def __str__(self): 

709 return str(self.addr) 

710 

711 

712def register_inet(oid=None, conn_or_curs=None): 

713 """Create the INET type and an Inet adapter. 

714 

715 :param oid: oid for the PostgreSQL :sql:`inet` type, or 2-items sequence 

716 with oids of the type and the array. If not specified, use PostgreSQL 

717 standard oids. 

718 :param conn_or_curs: where to register the typecaster. If not specified, 

719 register it globally. 

720 """ 

721 import warnings 

722 warnings.warn( 

723 "the inet adapter is deprecated, it's not very useful", 

724 DeprecationWarning) 

725 

726 if not oid: 

727 oid1 = 869 

728 oid2 = 1041 

729 elif isinstance(oid, (list, tuple)): 

730 oid1, oid2 = oid 

731 else: 

732 oid1 = oid 

733 oid2 = 1041 

734 

735 _ext.INET = _ext.new_type((oid1, ), "INET", 

736 lambda data, cursor: data and Inet(data) or None) 

737 _ext.INETARRAY = _ext.new_array_type((oid2, ), "INETARRAY", _ext.INET) 

738 

739 _ext.register_type(_ext.INET, conn_or_curs) 

740 _ext.register_type(_ext.INETARRAY, conn_or_curs) 

741 

742 return _ext.INET 

743 

744 

745def wait_select(conn): 

746 """Wait until a connection or cursor has data available. 

747 

748 The function is an example of a wait callback to be registered with 

749 `~psycopg2.extensions.set_wait_callback()`. This function uses 

750 :py:func:`~select.select()` to wait for data to become available, and 

751 therefore is able to handle/receive SIGINT/KeyboardInterrupt. 

752 """ 

753 import select 

754 from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE 

755 

756 while True: 

757 try: 

758 state = conn.poll() 

759 if state == POLL_OK: 

760 break 

761 elif state == POLL_READ: 

762 select.select([conn.fileno()], [], []) 

763 elif state == POLL_WRITE: 

764 select.select([], [conn.fileno()], []) 

765 else: 

766 raise conn.OperationalError(f"bad state from poll: {state}") 

767 except KeyboardInterrupt: 

768 conn.cancel() 

769 # the loop will be broken by a server error 

770 continue 

771 

772 

773def _solve_conn_curs(conn_or_curs): 

774 """Return the connection and a DBAPI cursor from a connection or cursor.""" 

775 if conn_or_curs is None: 

776 raise psycopg2.ProgrammingError("no connection or cursor provided") 

777 

778 if hasattr(conn_or_curs, 'execute'): 

779 conn = conn_or_curs.connection 

780 curs = conn.cursor(cursor_factory=_cursor) 

781 else: 

782 conn = conn_or_curs 

783 curs = conn.cursor(cursor_factory=_cursor) 

784 

785 return conn, curs 

786 

787 

788class HstoreAdapter: 

789 """Adapt a Python dict to the hstore syntax.""" 

790 def __init__(self, wrapped): 

791 self.wrapped = wrapped 

792 

793 def prepare(self, conn): 

794 self.conn = conn 

795 

796 # use an old-style getquoted implementation if required 

797 if conn.info.server_version < 90000: 

798 self.getquoted = self._getquoted_8 

799 

800 def _getquoted_8(self): 

801 """Use the operators available in PG pre-9.0.""" 

802 if not self.wrapped: 

803 return b"''::hstore" 

804 

805 adapt = _ext.adapt 

806 rv = [] 

807 for k, v in self.wrapped.items(): 

808 k = adapt(k) 

809 k.prepare(self.conn) 

810 k = k.getquoted() 

811 

812 if v is not None: 

813 v = adapt(v) 

814 v.prepare(self.conn) 

815 v = v.getquoted() 

816 else: 

817 v = b'NULL' 

818 

819 # XXX this b'ing is painfully inefficient! 

820 rv.append(b"(" + k + b" => " + v + b")") 

821 

822 return b"(" + b'||'.join(rv) + b")" 

823 

824 def _getquoted_9(self): 

825 """Use the hstore(text[], text[]) function.""" 

826 if not self.wrapped: 

827 return b"''::hstore" 

828 

829 k = _ext.adapt(list(self.wrapped.keys())) 

830 k.prepare(self.conn) 

831 v = _ext.adapt(list(self.wrapped.values())) 

832 v.prepare(self.conn) 

833 return b"hstore(" + k.getquoted() + b", " + v.getquoted() + b")" 

834 

835 getquoted = _getquoted_9 

836 

837 _re_hstore = _re.compile(r""" 

838 # hstore key: 

839 # a string of normal or escaped chars 

840 "((?: [^"\\] | \\. )*)" 

841 \s*=>\s* # hstore value 

842 (?: 

843 NULL # the value can be null - not catched 

844 # or a quoted string like the key 

845 | "((?: [^"\\] | \\. )*)" 

846 ) 

847 (?:\s*,\s*|$) # pairs separated by comma or end of string. 

848 """, _re.VERBOSE) 

849 

850 @classmethod 

851 def parse(self, s, cur, _bsdec=_re.compile(r"\\(.)")): 

852 """Parse an hstore representation in a Python string. 

853 

854 The hstore is represented as something like:: 

855 

856 "a"=>"1", "b"=>"2" 

857 

858 with backslash-escaped strings. 

859 """ 

860 if s is None: 

861 return None 

862 

863 rv = {} 

864 start = 0 

865 for m in self._re_hstore.finditer(s): 

866 if m is None or m.start() != start: 

867 raise psycopg2.InterfaceError( 

868 f"error parsing hstore pair at char {start}") 

869 k = _bsdec.sub(r'\1', m.group(1)) 

870 v = m.group(2) 

871 if v is not None: 

872 v = _bsdec.sub(r'\1', v) 

873 

874 rv[k] = v 

875 start = m.end() 

876 

877 if start < len(s): 

878 raise psycopg2.InterfaceError( 

879 f"error parsing hstore: unparsed data after char {start}") 

880 

881 return rv 

882 

883 @classmethod 

884 def parse_unicode(self, s, cur): 

885 """Parse an hstore returning unicode keys and values.""" 

886 if s is None: 

887 return None 

888 

889 s = s.decode(_ext.encodings[cur.connection.encoding]) 

890 return self.parse(s, cur) 

891 

892 @classmethod 

893 def get_oids(self, conn_or_curs): 

894 """Return the lists of OID of the hstore and hstore[] types. 

895 """ 

896 conn, curs = _solve_conn_curs(conn_or_curs) 

897 

898 # Store the transaction status of the connection to revert it after use 

899 conn_status = conn.status 

900 

901 # column typarray not available before PG 8.3 

902 typarray = conn.info.server_version >= 80300 and "typarray" or "NULL" 

903 

904 rv0, rv1 = [], [] 

905 

906 # get the oid for the hstore 

907 curs.execute(f"""SELECT t.oid, {typarray} 

908FROM pg_type t JOIN pg_namespace ns 

909 ON typnamespace = ns.oid 

910WHERE typname = 'hstore'; 

911""") 

912 for oids in curs: 

913 rv0.append(oids[0]) 

914 rv1.append(oids[1]) 

915 

916 # revert the status of the connection as before the command 

917 if (conn_status != _ext.STATUS_IN_TRANSACTION 

918 and not conn.autocommit): 

919 conn.rollback() 

920 

921 return tuple(rv0), tuple(rv1) 

922 

923 

924def register_hstore(conn_or_curs, globally=False, unicode=False, 

925 oid=None, array_oid=None): 

926 r"""Register adapter and typecaster for `!dict`\-\ |hstore| conversions. 

927 

928 :param conn_or_curs: a connection or cursor: the typecaster will be 

929 registered only on this object unless *globally* is set to `!True` 

930 :param globally: register the adapter globally, not only on *conn_or_curs* 

931 :param unicode: if `!True`, keys and values returned from the database 

932 will be `!unicode` instead of `!str`. The option is not available on 

933 Python 3 

934 :param oid: the OID of the |hstore| type if known. If not, it will be 

935 queried on *conn_or_curs*. 

936 :param array_oid: the OID of the |hstore| array type if known. If not, it 

937 will be queried on *conn_or_curs*. 

938 

939 The connection or cursor passed to the function will be used to query the 

940 database and look for the OID of the |hstore| type (which may be different 

941 across databases). If querying is not desirable (e.g. with 

942 :ref:`asynchronous connections <async-support>`) you may specify it in the 

943 *oid* parameter, which can be found using a query such as :sql:`SELECT 

944 'hstore'::regtype::oid`. Analogously you can obtain a value for *array_oid* 

945 using a query such as :sql:`SELECT 'hstore[]'::regtype::oid`. 

946 

947 Note that, when passing a dictionary from Python to the database, both 

948 strings and unicode keys and values are supported. Dictionaries returned 

949 from the database have keys/values according to the *unicode* parameter. 

950 

951 The |hstore| contrib module must be already installed in the database 

952 (executing the ``hstore.sql`` script in your ``contrib`` directory). 

953 Raise `~psycopg2.ProgrammingError` if the type is not found. 

954 """ 

955 if oid is None: 

956 oid = HstoreAdapter.get_oids(conn_or_curs) 

957 if oid is None or not oid[0]: 

958 raise psycopg2.ProgrammingError( 

959 "hstore type not found in the database. " 

960 "please install it from your 'contrib/hstore.sql' file") 

961 else: 

962 array_oid = oid[1] 

963 oid = oid[0] 

964 

965 if isinstance(oid, int): 

966 oid = (oid,) 

967 

968 if array_oid is not None: 

969 if isinstance(array_oid, int): 

970 array_oid = (array_oid,) 

971 else: 

972 array_oid = tuple([x for x in array_oid if x]) 

973 

974 # create and register the typecaster 

975 HSTORE = _ext.new_type(oid, "HSTORE", HstoreAdapter.parse) 

976 _ext.register_type(HSTORE, not globally and conn_or_curs or None) 

977 _ext.register_adapter(dict, HstoreAdapter) 

978 

979 if array_oid: 

980 HSTOREARRAY = _ext.new_array_type(array_oid, "HSTOREARRAY", HSTORE) 

981 _ext.register_type(HSTOREARRAY, not globally and conn_or_curs or None) 

982 

983 

984class CompositeCaster: 

985 """Helps conversion of a PostgreSQL composite type into a Python object. 

986 

987 The class is usually created by the `register_composite()` function. 

988 You may want to create and register manually instances of the class if 

989 querying the database at registration time is not desirable (such as when 

990 using an :ref:`asynchronous connections <async-support>`). 

991 

992 """ 

993 def __init__(self, name, oid, attrs, array_oid=None, schema=None): 

994 self.name = name 

995 self.schema = schema 

996 self.oid = oid 

997 self.array_oid = array_oid 

998 

999 self.attnames = [a[0] for a in attrs] 

1000 self.atttypes = [a[1] for a in attrs] 

1001 self._create_type(name, self.attnames) 

1002 self.typecaster = _ext.new_type((oid,), name, self.parse) 

1003 if array_oid: 

1004 self.array_typecaster = _ext.new_array_type( 

1005 (array_oid,), f"{name}ARRAY", self.typecaster) 

1006 else: 

1007 self.array_typecaster = None 

1008 

1009 def parse(self, s, curs): 

1010 if s is None: 

1011 return None 

1012 

1013 tokens = self.tokenize(s) 

1014 if len(tokens) != len(self.atttypes): 

1015 raise psycopg2.DataError( 

1016 "expecting %d components for the type %s, %d found instead" % 

1017 (len(self.atttypes), self.name, len(tokens))) 

1018 

1019 values = [curs.cast(oid, token) 

1020 for oid, token in zip(self.atttypes, tokens)] 

1021 

1022 return self.make(values) 

1023 

1024 def make(self, values): 

1025 """Return a new Python object representing the data being casted. 

1026 

1027 *values* is the list of attributes, already casted into their Python 

1028 representation. 

1029 

1030 You can subclass this method to :ref:`customize the composite cast 

1031 <custom-composite>`. 

1032 """ 

1033 

1034 return self._ctor(values) 

1035 

1036 _re_tokenize = _re.compile(r""" 

1037 \(? ([,)]) # an empty token, representing NULL 

1038| \(? " ((?: [^"] | "")*) " [,)] # or a quoted string 

1039| \(? ([^",)]+) [,)] # or an unquoted string 

1040 """, _re.VERBOSE) 

1041 

1042 _re_undouble = _re.compile(r'(["\\])\1') 

1043 

1044 @classmethod 

1045 def tokenize(self, s): 

1046 rv = [] 

1047 for m in self._re_tokenize.finditer(s): 

1048 if m is None: 

1049 raise psycopg2.InterfaceError(f"can't parse type: {s!r}") 

1050 if m.group(1) is not None: 

1051 rv.append(None) 

1052 elif m.group(2) is not None: 

1053 rv.append(self._re_undouble.sub(r"\1", m.group(2))) 

1054 else: 

1055 rv.append(m.group(3)) 

1056 

1057 return rv 

1058 

1059 def _create_type(self, name, attnames): 

1060 name = _re_clean.sub('_', name) 

1061 self.type = namedtuple(name, attnames) 

1062 self._ctor = self.type._make 

1063 

1064 @classmethod 

1065 def _from_db(self, name, conn_or_curs): 

1066 """Return a `CompositeCaster` instance for the type *name*. 

1067 

1068 Raise `ProgrammingError` if the type is not found. 

1069 """ 

1070 conn, curs = _solve_conn_curs(conn_or_curs) 

1071 

1072 # Store the transaction status of the connection to revert it after use 

1073 conn_status = conn.status 

1074 

1075 # Use the correct schema 

1076 if '.' in name: 

1077 schema, tname = name.split('.', 1) 

1078 else: 

1079 tname = name 

1080 schema = 'public' 

1081 

1082 # column typarray not available before PG 8.3 

1083 typarray = conn.info.server_version >= 80300 and "typarray" or "NULL" 

1084 

1085 # get the type oid and attributes 

1086 curs.execute("""\ 

1087SELECT t.oid, %s, attname, atttypid 

1088FROM pg_type t 

1089JOIN pg_namespace ns ON typnamespace = ns.oid 

1090JOIN pg_attribute a ON attrelid = typrelid 

1091WHERE typname = %%s AND nspname = %%s 

1092 AND attnum > 0 AND NOT attisdropped 

1093ORDER BY attnum; 

1094""" % typarray, (tname, schema)) 

1095 

1096 recs = curs.fetchall() 

1097 

1098 if not recs: 

1099 # The above algorithm doesn't work for customized seach_path 

1100 # (#1487) The implementation below works better, but, to guarantee 

1101 # backwards compatibility, use it only if the original one failed. 

1102 try: 

1103 savepoint = False 

1104 # Because we executed statements earlier, we are either INTRANS 

1105 # or we are IDLE only if the transaction is autocommit, in 

1106 # which case we don't need the savepoint anyway. 

1107 if conn.status == _ext.STATUS_IN_TRANSACTION: 

1108 curs.execute("SAVEPOINT register_type") 

1109 savepoint = True 

1110 

1111 curs.execute("""\ 

1112SELECT t.oid, %s, attname, atttypid, typname, nspname 

1113FROM pg_type t 

1114JOIN pg_namespace ns ON typnamespace = ns.oid 

1115JOIN pg_attribute a ON attrelid = typrelid 

1116WHERE t.oid = %%s::regtype 

1117 AND attnum > 0 AND NOT attisdropped 

1118ORDER BY attnum; 

1119""" % typarray, (name, )) 

1120 except psycopg2.ProgrammingError: 

1121 pass 

1122 else: 

1123 recs = curs.fetchall() 

1124 if recs: 

1125 tname = recs[0][4] 

1126 schema = recs[0][5] 

1127 finally: 

1128 if savepoint: 

1129 curs.execute("ROLLBACK TO SAVEPOINT register_type") 

1130 

1131 # revert the status of the connection as before the command 

1132 if conn_status != _ext.STATUS_IN_TRANSACTION and not conn.autocommit: 

1133 conn.rollback() 

1134 

1135 if not recs: 

1136 raise psycopg2.ProgrammingError( 

1137 f"PostgreSQL type '{name}' not found") 

1138 

1139 type_oid = recs[0][0] 

1140 array_oid = recs[0][1] 

1141 type_attrs = [(r[2], r[3]) for r in recs] 

1142 

1143 return self(tname, type_oid, type_attrs, 

1144 array_oid=array_oid, schema=schema) 

1145 

1146 

1147def register_composite(name, conn_or_curs, globally=False, factory=None): 

1148 """Register a typecaster to convert a composite type into a tuple. 

1149 

1150 :param name: the name of a PostgreSQL composite type, e.g. created using 

1151 the |CREATE TYPE|_ command 

1152 :param conn_or_curs: a connection or cursor used to find the type oid and 

1153 components; the typecaster is registered in a scope limited to this 

1154 object, unless *globally* is set to `!True` 

1155 :param globally: if `!False` (default) register the typecaster only on 

1156 *conn_or_curs*, otherwise register it globally 

1157 :param factory: if specified it should be a `CompositeCaster` subclass: use 

1158 it to :ref:`customize how to cast composite types <custom-composite>` 

1159 :return: the registered `CompositeCaster` or *factory* instance 

1160 responsible for the conversion 

1161 """ 

1162 if factory is None: 

1163 factory = CompositeCaster 

1164 

1165 caster = factory._from_db(name, conn_or_curs) 

1166 _ext.register_type(caster.typecaster, not globally and conn_or_curs or None) 

1167 

1168 if caster.array_typecaster is not None: 

1169 _ext.register_type( 

1170 caster.array_typecaster, not globally and conn_or_curs or None) 

1171 

1172 return caster 

1173 

1174 

1175def _paginate(seq, page_size): 

1176 """Consume an iterable and return it in chunks. 

1177 

1178 Every chunk is at most `page_size`. Never return an empty chunk. 

1179 """ 

1180 page = [] 

1181 it = iter(seq) 

1182 while True: 

1183 try: 

1184 for i in range(page_size): 

1185 page.append(next(it)) 

1186 yield page 

1187 page = [] 

1188 except StopIteration: 

1189 if page: 

1190 yield page 

1191 return 

1192 

1193 

1194def execute_batch(cur, sql, argslist, page_size=100): 

1195 r"""Execute groups of statements in fewer server roundtrips. 

1196 

1197 Execute *sql* several times, against all parameters set (sequences or 

1198 mappings) found in *argslist*. 

1199 

1200 The function is semantically similar to 

1201 

1202 .. parsed-literal:: 

1203 

1204 *cur*\.\ `~cursor.executemany`\ (\ *sql*\ , *argslist*\ ) 

1205 

1206 but has a different implementation: Psycopg will join the statements into 

1207 fewer multi-statement commands, each one containing at most *page_size* 

1208 statements, resulting in a reduced number of server roundtrips. 

1209 

1210 After the execution of the function the `cursor.rowcount` property will 

1211 **not** contain a total result. 

1212 

1213 """ 

1214 for page in _paginate(argslist, page_size=page_size): 

1215 sqls = [cur.mogrify(sql, args) for args in page] 

1216 cur.execute(b";".join(sqls)) 

1217 

1218 

1219def execute_values(cur, sql, argslist, template=None, page_size=100, fetch=False): 

1220 '''Execute a statement using :sql:`VALUES` with a sequence of parameters. 

1221 

1222 :param cur: the cursor to use to execute the query. 

1223 

1224 :param sql: the query to execute. It must contain a single ``%s`` 

1225 placeholder, which will be replaced by a `VALUES list`__. 

1226 Example: ``"INSERT INTO mytable (id, f1, f2) VALUES %s"``. 

1227 

1228 :param argslist: sequence of sequences or dictionaries with the arguments 

1229 to send to the query. The type and content must be consistent with 

1230 *template*. 

1231 

1232 :param template: the snippet to merge to every item in *argslist* to 

1233 compose the query. 

1234 

1235 - If the *argslist* items are sequences it should contain positional 

1236 placeholders (e.g. ``"(%s, %s, %s)"``, or ``"(%s, %s, 42)``" if there 

1237 are constants value...). 

1238 

1239 - If the *argslist* items are mappings it should contain named 

1240 placeholders (e.g. ``"(%(id)s, %(f1)s, 42)"``). 

1241 

1242 If not specified, assume the arguments are sequence and use a simple 

1243 positional template (i.e. ``(%s, %s, ...)``), with the number of 

1244 placeholders sniffed by the first element in *argslist*. 

1245 

1246 :param page_size: maximum number of *argslist* items to include in every 

1247 statement. If there are more items the function will execute more than 

1248 one statement. 

1249 

1250 :param fetch: if `!True` return the query results into a list (like in a 

1251 `~cursor.fetchall()`). Useful for queries with :sql:`RETURNING` 

1252 clause. 

1253 

1254 .. __: https://www.postgresql.org/docs/current/static/queries-values.html 

1255 

1256 After the execution of the function the `cursor.rowcount` property will 

1257 **not** contain a total result. 

1258 

1259 While :sql:`INSERT` is an obvious candidate for this function it is 

1260 possible to use it with other statements, for example:: 

1261 

1262 >>> cur.execute( 

1263 ... "create table test (id int primary key, v1 int, v2 int)") 

1264 

1265 >>> execute_values(cur, 

1266 ... "INSERT INTO test (id, v1, v2) VALUES %s", 

1267 ... [(1, 2, 3), (4, 5, 6), (7, 8, 9)]) 

1268 

1269 >>> execute_values(cur, 

1270 ... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1) 

1271 ... WHERE test.id = data.id""", 

1272 ... [(1, 20), (4, 50)]) 

1273 

1274 >>> cur.execute("select * from test order by id") 

1275 >>> cur.fetchall() 

1276 [(1, 20, 3), (4, 50, 6), (7, 8, 9)]) 

1277 

1278 ''' 

1279 from psycopg2.sql import Composable 

1280 if isinstance(sql, Composable): 

1281 sql = sql.as_string(cur) 

1282 

1283 # we can't just use sql % vals because vals is bytes: if sql is bytes 

1284 # there will be some decoding error because of stupid codec used, and Py3 

1285 # doesn't implement % on bytes. 

1286 if not isinstance(sql, bytes): 

1287 sql = sql.encode(_ext.encodings[cur.connection.encoding]) 

1288 pre, post = _split_sql(sql) 

1289 

1290 result = [] if fetch else None 

1291 for page in _paginate(argslist, page_size=page_size): 

1292 if template is None: 

1293 template = b'(' + b','.join([b'%s'] * len(page[0])) + b')' 

1294 parts = pre[:] 

1295 for args in page: 

1296 parts.append(cur.mogrify(template, args)) 

1297 parts.append(b',') 

1298 parts[-1:] = post 

1299 cur.execute(b''.join(parts)) 

1300 if fetch: 

1301 result.extend(cur.fetchall()) 

1302 

1303 return result 

1304 

1305 

1306def _split_sql(sql): 

1307 """Split *sql* on a single ``%s`` placeholder. 

1308 

1309 Split on the %s, perform %% replacement and return pre, post lists of 

1310 snippets. 

1311 """ 

1312 curr = pre = [] 

1313 post = [] 

1314 tokens = _re.split(br'(%.)', sql) 

1315 for token in tokens: 

1316 if len(token) != 2 or token[:1] != b'%': 

1317 curr.append(token) 

1318 continue 

1319 

1320 if token[1:] == b's': 

1321 if curr is pre: 

1322 curr = post 

1323 else: 

1324 raise ValueError( 

1325 "the query contains more than one '%s' placeholder") 

1326 elif token[1:] == b'%': 

1327 curr.append(b'%') 

1328 else: 

1329 raise ValueError("unsupported format character: '%s'" 

1330 % token[1:].decode('ascii', 'replace')) 

1331 

1332 if curr is pre: 

1333 raise ValueError("the query doesn't contain any '%s' placeholder") 

1334 

1335 return pre, post 

1336 

1337 

1338# ascii except alnum and underscore 

1339_re_clean = _re.compile( 

1340 '[' + _re.escape(' !"#$%&\'()*+,-./:;<=>?@[\\]^`{|}~') + ']')