Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/psycopg2/extras.py: 21%
644 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""Miscellaneous goodies for psycopg2
3This module is a generic place used to hold little helper functions
4and classes until a better place in the distribution is found.
5"""
6# psycopg/extras.py - miscellaneous extra goodies for psycopg
7#
8# Copyright (C) 2003-2019 Federico Di Gregorio <fog@debian.org>
9# Copyright (C) 2020-2021 The Psycopg Team
10#
11# psycopg2 is free software: you can redistribute it and/or modify it
12# under the terms of the GNU Lesser General Public License as published
13# by the Free Software Foundation, either version 3 of the License, or
14# (at your option) any later version.
15#
16# In addition, as a special exception, the copyright holders give
17# permission to link this program with the OpenSSL library (or with
18# modified versions of OpenSSL that use the same license as OpenSSL),
19# and distribute linked combinations including the two.
20#
21# You must obey the GNU Lesser General Public License in all respects for
22# all of the code used other than OpenSSL.
23#
24# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
25# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
26# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
27# License for more details.
29import os as _os
30import time as _time
31import re as _re
32from collections import namedtuple, OrderedDict
34import logging as _logging
36import psycopg2
37from psycopg2 import extensions as _ext
38from .extensions import cursor as _cursor
39from .extensions import connection as _connection
40from .extensions import adapt as _A, quote_ident
41from functools import lru_cache
43from psycopg2._psycopg import ( # noqa
44 REPLICATION_PHYSICAL, REPLICATION_LOGICAL,
45 ReplicationConnection as _replicationConnection,
46 ReplicationCursor as _replicationCursor,
47 ReplicationMessage)
50# expose the json adaptation stuff into the module
51from psycopg2._json import ( # noqa
52 json, Json, register_json, register_default_json, register_default_jsonb)
55# Expose range-related objects
56from psycopg2._range import ( # noqa
57 Range, NumericRange, DateRange, DateTimeRange, DateTimeTZRange,
58 register_range, RangeAdapter, RangeCaster)
61# Expose ipaddress-related objects
62from psycopg2._ipaddress import register_ipaddress # noqa
65class DictCursorBase(_cursor):
66 """Base class for all dict-like cursors."""
68 def __init__(self, *args, **kwargs):
69 if 'row_factory' in kwargs:
70 row_factory = kwargs['row_factory']
71 del kwargs['row_factory']
72 else:
73 raise NotImplementedError(
74 "DictCursorBase can't be instantiated without a row factory.")
75 super().__init__(*args, **kwargs)
76 self._query_executed = False
77 self._prefetch = False
78 self.row_factory = row_factory
80 def fetchone(self):
81 if self._prefetch:
82 res = super().fetchone()
83 if self._query_executed:
84 self._build_index()
85 if not self._prefetch:
86 res = super().fetchone()
87 return res
89 def fetchmany(self, size=None):
90 if self._prefetch:
91 res = super().fetchmany(size)
92 if self._query_executed:
93 self._build_index()
94 if not self._prefetch:
95 res = super().fetchmany(size)
96 return res
98 def fetchall(self):
99 if self._prefetch:
100 res = super().fetchall()
101 if self._query_executed:
102 self._build_index()
103 if not self._prefetch:
104 res = super().fetchall()
105 return res
107 def __iter__(self):
108 try:
109 if self._prefetch:
110 res = super().__iter__()
111 first = next(res)
112 if self._query_executed:
113 self._build_index()
114 if not self._prefetch:
115 res = super().__iter__()
116 first = next(res)
118 yield first
119 while True:
120 yield next(res)
121 except StopIteration:
122 return
125class DictConnection(_connection):
126 """A connection that uses `DictCursor` automatically."""
127 def cursor(self, *args, **kwargs):
128 kwargs.setdefault('cursor_factory', self.cursor_factory or DictCursor)
129 return super().cursor(*args, **kwargs)
132class DictCursor(DictCursorBase):
133 """A cursor that keeps a list of column name -> index mappings__.
135 .. __: https://docs.python.org/glossary.html#term-mapping
136 """
138 def __init__(self, *args, **kwargs):
139 kwargs['row_factory'] = DictRow
140 super().__init__(*args, **kwargs)
141 self._prefetch = True
143 def execute(self, query, vars=None):
144 self.index = OrderedDict()
145 self._query_executed = True
146 return super().execute(query, vars)
148 def callproc(self, procname, vars=None):
149 self.index = OrderedDict()
150 self._query_executed = True
151 return super().callproc(procname, vars)
153 def _build_index(self):
154 if self._query_executed and self.description:
155 for i in range(len(self.description)):
156 self.index[self.description[i][0]] = i
157 self._query_executed = False
160class DictRow(list):
161 """A row object that allow by-column-name access to data."""
163 __slots__ = ('_index',)
165 def __init__(self, cursor):
166 self._index = cursor.index
167 self[:] = [None] * len(cursor.description)
169 def __getitem__(self, x):
170 if not isinstance(x, (int, slice)):
171 x = self._index[x]
172 return super().__getitem__(x)
174 def __setitem__(self, x, v):
175 if not isinstance(x, (int, slice)):
176 x = self._index[x]
177 super().__setitem__(x, v)
179 def items(self):
180 g = super().__getitem__
181 return ((n, g(self._index[n])) for n in self._index)
183 def keys(self):
184 return iter(self._index)
186 def values(self):
187 g = super().__getitem__
188 return (g(self._index[n]) for n in self._index)
190 def get(self, x, default=None):
191 try:
192 return self[x]
193 except Exception:
194 return default
196 def copy(self):
197 return OrderedDict(self.items())
199 def __contains__(self, x):
200 return x in self._index
202 def __reduce__(self):
203 # this is apparently useless, but it fixes #1073
204 return super().__reduce__()
206 def __getstate__(self):
207 return self[:], self._index.copy()
209 def __setstate__(self, data):
210 self[:] = data[0]
211 self._index = data[1]
214class RealDictConnection(_connection):
215 """A connection that uses `RealDictCursor` automatically."""
216 def cursor(self, *args, **kwargs):
217 kwargs.setdefault('cursor_factory', self.cursor_factory or RealDictCursor)
218 return super().cursor(*args, **kwargs)
221class RealDictCursor(DictCursorBase):
222 """A cursor that uses a real dict as the base type for rows.
224 Note that this cursor is extremely specialized and does not allow
225 the normal access (using integer indices) to fetched data. If you need
226 to access database rows both as a dictionary and a list, then use
227 the generic `DictCursor` instead of `!RealDictCursor`.
228 """
229 def __init__(self, *args, **kwargs):
230 kwargs['row_factory'] = RealDictRow
231 super().__init__(*args, **kwargs)
233 def execute(self, query, vars=None):
234 self.column_mapping = []
235 self._query_executed = True
236 return super().execute(query, vars)
238 def callproc(self, procname, vars=None):
239 self.column_mapping = []
240 self._query_executed = True
241 return super().callproc(procname, vars)
243 def _build_index(self):
244 if self._query_executed and self.description:
245 self.column_mapping = [d[0] for d in self.description]
246 self._query_executed = False
249class RealDictRow(OrderedDict):
250 """A `!dict` subclass representing a data record."""
252 def __init__(self, *args, **kwargs):
253 if args and isinstance(args[0], _cursor):
254 cursor = args[0]
255 args = args[1:]
256 else:
257 cursor = None
259 super().__init__(*args, **kwargs)
261 if cursor is not None:
262 # Required for named cursors
263 if cursor.description and not cursor.column_mapping:
264 cursor._build_index()
266 # Store the cols mapping in the dict itself until the row is fully
267 # populated, so we don't need to add attributes to the class
268 # (hence keeping its maintenance, special pickle support, etc.)
269 self[RealDictRow] = cursor.column_mapping
271 def __setitem__(self, key, value):
272 if RealDictRow in self:
273 # We are in the row building phase
274 mapping = self[RealDictRow]
275 super().__setitem__(mapping[key], value)
276 if key == len(mapping) - 1:
277 # Row building finished
278 del self[RealDictRow]
279 return
281 super().__setitem__(key, value)
284class NamedTupleConnection(_connection):
285 """A connection that uses `NamedTupleCursor` automatically."""
286 def cursor(self, *args, **kwargs):
287 kwargs.setdefault('cursor_factory', self.cursor_factory or NamedTupleCursor)
288 return super().cursor(*args, **kwargs)
291class NamedTupleCursor(_cursor):
292 """A cursor that generates results as `~collections.namedtuple`.
294 `!fetch*()` methods will return named tuples instead of regular tuples, so
295 their elements can be accessed both as regular numeric items as well as
296 attributes.
298 >>> nt_cur = conn.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor)
299 >>> rec = nt_cur.fetchone()
300 >>> rec
301 Record(id=1, num=100, data="abc'def")
302 >>> rec[1]
303 100
304 >>> rec.data
305 "abc'def"
306 """
307 Record = None
308 MAX_CACHE = 1024
310 def execute(self, query, vars=None):
311 self.Record = None
312 return super().execute(query, vars)
314 def executemany(self, query, vars):
315 self.Record = None
316 return super().executemany(query, vars)
318 def callproc(self, procname, vars=None):
319 self.Record = None
320 return super().callproc(procname, vars)
322 def fetchone(self):
323 t = super().fetchone()
324 if t is not None:
325 nt = self.Record
326 if nt is None:
327 nt = self.Record = self._make_nt()
328 return nt._make(t)
330 def fetchmany(self, size=None):
331 ts = super().fetchmany(size)
332 nt = self.Record
333 if nt is None:
334 nt = self.Record = self._make_nt()
335 return list(map(nt._make, ts))
337 def fetchall(self):
338 ts = super().fetchall()
339 nt = self.Record
340 if nt is None:
341 nt = self.Record = self._make_nt()
342 return list(map(nt._make, ts))
344 def __iter__(self):
345 try:
346 it = super().__iter__()
347 t = next(it)
349 nt = self.Record
350 if nt is None:
351 nt = self.Record = self._make_nt()
353 yield nt._make(t)
355 while True:
356 yield nt._make(next(it))
357 except StopIteration:
358 return
360 def _make_nt(self):
361 key = tuple(d[0] for d in self.description) if self.description else ()
362 return self._cached_make_nt(key)
364 @classmethod
365 def _do_make_nt(cls, key):
366 fields = []
367 for s in key:
368 s = _re_clean.sub('_', s)
369 # Python identifier cannot start with numbers, namedtuple fields
370 # cannot start with underscore. So...
371 if s[0] == '_' or '0' <= s[0] <= '9':
372 s = 'f' + s
373 fields.append(s)
375 nt = namedtuple("Record", fields)
376 return nt
379@lru_cache(512)
380def _cached_make_nt(cls, key):
381 return cls._do_make_nt(key)
384# Exposed for testability, and if someone wants to monkeypatch to tweak
385# the cache size.
386NamedTupleCursor._cached_make_nt = classmethod(_cached_make_nt)
389class LoggingConnection(_connection):
390 """A connection that logs all queries to a file or logger__ object.
392 .. __: https://docs.python.org/library/logging.html
393 """
395 def initialize(self, logobj):
396 """Initialize the connection to log to `!logobj`.
398 The `!logobj` parameter can be an open file object or a Logger/LoggerAdapter
399 instance from the standard logging module.
400 """
401 self._logobj = logobj
402 if _logging and isinstance(
403 logobj, (_logging.Logger, _logging.LoggerAdapter)):
404 self.log = self._logtologger
405 else:
406 self.log = self._logtofile
408 def filter(self, msg, curs):
409 """Filter the query before logging it.
411 This is the method to overwrite to filter unwanted queries out of the
412 log or to add some extra data to the output. The default implementation
413 just does nothing.
414 """
415 return msg
417 def _logtofile(self, msg, curs):
418 msg = self.filter(msg, curs)
419 if msg:
420 if isinstance(msg, bytes):
421 msg = msg.decode(_ext.encodings[self.encoding], 'replace')
422 self._logobj.write(msg + _os.linesep)
424 def _logtologger(self, msg, curs):
425 msg = self.filter(msg, curs)
426 if msg:
427 self._logobj.debug(msg)
429 def _check(self):
430 if not hasattr(self, '_logobj'):
431 raise self.ProgrammingError(
432 "LoggingConnection object has not been initialize()d")
434 def cursor(self, *args, **kwargs):
435 self._check()
436 kwargs.setdefault('cursor_factory', self.cursor_factory or LoggingCursor)
437 return super().cursor(*args, **kwargs)
440class LoggingCursor(_cursor):
441 """A cursor that logs queries using its connection logging facilities."""
443 def execute(self, query, vars=None):
444 try:
445 return super().execute(query, vars)
446 finally:
447 self.connection.log(self.query, self)
449 def callproc(self, procname, vars=None):
450 try:
451 return super().callproc(procname, vars)
452 finally:
453 self.connection.log(self.query, self)
456class MinTimeLoggingConnection(LoggingConnection):
457 """A connection that logs queries based on execution time.
459 This is just an example of how to sub-class `LoggingConnection` to
460 provide some extra filtering for the logged queries. Both the
461 `initialize()` and `filter()` methods are overwritten to make sure
462 that only queries executing for more than ``mintime`` ms are logged.
464 Note that this connection uses the specialized cursor
465 `MinTimeLoggingCursor`.
466 """
467 def initialize(self, logobj, mintime=0):
468 LoggingConnection.initialize(self, logobj)
469 self._mintime = mintime
471 def filter(self, msg, curs):
472 t = (_time.time() - curs.timestamp) * 1000
473 if t > self._mintime:
474 if isinstance(msg, bytes):
475 msg = msg.decode(_ext.encodings[self.encoding], 'replace')
476 return f"{msg}{_os.linesep} (execution time: {t} ms)"
478 def cursor(self, *args, **kwargs):
479 kwargs.setdefault('cursor_factory',
480 self.cursor_factory or MinTimeLoggingCursor)
481 return LoggingConnection.cursor(self, *args, **kwargs)
484class MinTimeLoggingCursor(LoggingCursor):
485 """The cursor sub-class companion to `MinTimeLoggingConnection`."""
487 def execute(self, query, vars=None):
488 self.timestamp = _time.time()
489 return LoggingCursor.execute(self, query, vars)
491 def callproc(self, procname, vars=None):
492 self.timestamp = _time.time()
493 return LoggingCursor.callproc(self, procname, vars)
496class LogicalReplicationConnection(_replicationConnection):
498 def __init__(self, *args, **kwargs):
499 kwargs['replication_type'] = REPLICATION_LOGICAL
500 super().__init__(*args, **kwargs)
503class PhysicalReplicationConnection(_replicationConnection):
505 def __init__(self, *args, **kwargs):
506 kwargs['replication_type'] = REPLICATION_PHYSICAL
507 super().__init__(*args, **kwargs)
510class StopReplication(Exception):
511 """
512 Exception used to break out of the endless loop in
513 `~ReplicationCursor.consume_stream()`.
515 Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
516 `~psycopg2.Error` as occurrence of this exception does not indicate an
517 error.
518 """
519 pass
522class ReplicationCursor(_replicationCursor):
523 """A cursor used for communication on replication connections."""
525 def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
526 """Create streaming replication slot."""
528 command = f"CREATE_REPLICATION_SLOT {quote_ident(slot_name, self)} "
530 if slot_type is None:
531 slot_type = self.connection.replication_type
533 if slot_type == REPLICATION_LOGICAL:
534 if output_plugin is None:
535 raise psycopg2.ProgrammingError(
536 "output plugin name is required to create "
537 "logical replication slot")
539 command += f"LOGICAL {quote_ident(output_plugin, self)}"
541 elif slot_type == REPLICATION_PHYSICAL:
542 if output_plugin is not None:
543 raise psycopg2.ProgrammingError(
544 "cannot specify output plugin name when creating "
545 "physical replication slot")
547 command += "PHYSICAL"
549 else:
550 raise psycopg2.ProgrammingError(
551 f"unrecognized replication type: {repr(slot_type)}")
553 self.execute(command)
555 def drop_replication_slot(self, slot_name):
556 """Drop streaming replication slot."""
558 command = f"DROP_REPLICATION_SLOT {quote_ident(slot_name, self)}"
559 self.execute(command)
561 def start_replication(
562 self, slot_name=None, slot_type=None, start_lsn=0,
563 timeline=0, options=None, decode=False, status_interval=10):
564 """Start replication stream."""
566 command = "START_REPLICATION "
568 if slot_type is None:
569 slot_type = self.connection.replication_type
571 if slot_type == REPLICATION_LOGICAL:
572 if slot_name:
573 command += f"SLOT {quote_ident(slot_name, self)} "
574 else:
575 raise psycopg2.ProgrammingError(
576 "slot name is required for logical replication")
578 command += "LOGICAL "
580 elif slot_type == REPLICATION_PHYSICAL:
581 if slot_name:
582 command += f"SLOT {quote_ident(slot_name, self)} "
583 # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
585 else:
586 raise psycopg2.ProgrammingError(
587 f"unrecognized replication type: {repr(slot_type)}")
589 if type(start_lsn) is str:
590 lsn = start_lsn.split('/')
591 lsn = f"{int(lsn[0], 16):X}/{int(lsn[1], 16):08X}"
592 else:
593 lsn = f"{start_lsn >> 32 & 4294967295:X}/{start_lsn & 4294967295:08X}"
595 command += lsn
597 if timeline != 0:
598 if slot_type == REPLICATION_LOGICAL:
599 raise psycopg2.ProgrammingError(
600 "cannot specify timeline for logical replication")
602 command += f" TIMELINE {timeline}"
604 if options:
605 if slot_type == REPLICATION_PHYSICAL:
606 raise psycopg2.ProgrammingError(
607 "cannot specify output plugin options for physical replication")
609 command += " ("
610 for k, v in options.items():
611 if not command.endswith('('):
612 command += ", "
613 command += f"{quote_ident(k, self)} {_A(str(v))}"
614 command += ")"
616 self.start_replication_expert(
617 command, decode=decode, status_interval=status_interval)
619 # allows replication cursors to be used in select.select() directly
620 def fileno(self):
621 return self.connection.fileno()
624# a dbtype and adapter for Python UUID type
626class UUID_adapter:
627 """Adapt Python's uuid.UUID__ type to PostgreSQL's uuid__.
629 .. __: https://docs.python.org/library/uuid.html
630 .. __: https://www.postgresql.org/docs/current/static/datatype-uuid.html
631 """
633 def __init__(self, uuid):
634 self._uuid = uuid
636 def __conform__(self, proto):
637 if proto is _ext.ISQLQuote:
638 return self
640 def getquoted(self):
641 return (f"'{self._uuid}'::uuid").encode('utf8')
643 def __str__(self):
644 return f"'{self._uuid}'::uuid"
647def register_uuid(oids=None, conn_or_curs=None):
648 """Create the UUID type and an uuid.UUID adapter.
650 :param oids: oid for the PostgreSQL :sql:`uuid` type, or 2-items sequence
651 with oids of the type and the array. If not specified, use PostgreSQL
652 standard oids.
653 :param conn_or_curs: where to register the typecaster. If not specified,
654 register it globally.
655 """
657 import uuid
659 if not oids: 659 ↛ 662line 659 didn't jump to line 662, because the condition on line 659 was never false
660 oid1 = 2950
661 oid2 = 2951
662 elif isinstance(oids, (list, tuple)):
663 oid1, oid2 = oids
664 else:
665 oid1 = oids
666 oid2 = 2951
668 _ext.UUID = _ext.new_type((oid1, ), "UUID",
669 lambda data, cursor: data and uuid.UUID(data) or None)
670 _ext.UUIDARRAY = _ext.new_array_type((oid2,), "UUID[]", _ext.UUID)
672 _ext.register_type(_ext.UUID, conn_or_curs)
673 _ext.register_type(_ext.UUIDARRAY, conn_or_curs)
674 _ext.register_adapter(uuid.UUID, UUID_adapter)
676 return _ext.UUID
679# a type, dbtype and adapter for PostgreSQL inet type
681class Inet:
682 """Wrap a string to allow for correct SQL-quoting of inet values.
684 Note that this adapter does NOT check the passed value to make
685 sure it really is an inet-compatible address but DOES call adapt()
686 on it to make sure it is impossible to execute an SQL-injection
687 by passing an evil value to the initializer.
688 """
689 def __init__(self, addr):
690 self.addr = addr
692 def __repr__(self):
693 return f"{self.__class__.__name__}({self.addr!r})"
695 def prepare(self, conn):
696 self._conn = conn
698 def getquoted(self):
699 obj = _A(self.addr)
700 if hasattr(obj, 'prepare'):
701 obj.prepare(self._conn)
702 return obj.getquoted() + b"::inet"
704 def __conform__(self, proto):
705 if proto is _ext.ISQLQuote:
706 return self
708 def __str__(self):
709 return str(self.addr)
712def register_inet(oid=None, conn_or_curs=None):
713 """Create the INET type and an Inet adapter.
715 :param oid: oid for the PostgreSQL :sql:`inet` type, or 2-items sequence
716 with oids of the type and the array. If not specified, use PostgreSQL
717 standard oids.
718 :param conn_or_curs: where to register the typecaster. If not specified,
719 register it globally.
720 """
721 import warnings
722 warnings.warn(
723 "the inet adapter is deprecated, it's not very useful",
724 DeprecationWarning)
726 if not oid:
727 oid1 = 869
728 oid2 = 1041
729 elif isinstance(oid, (list, tuple)):
730 oid1, oid2 = oid
731 else:
732 oid1 = oid
733 oid2 = 1041
735 _ext.INET = _ext.new_type((oid1, ), "INET",
736 lambda data, cursor: data and Inet(data) or None)
737 _ext.INETARRAY = _ext.new_array_type((oid2, ), "INETARRAY", _ext.INET)
739 _ext.register_type(_ext.INET, conn_or_curs)
740 _ext.register_type(_ext.INETARRAY, conn_or_curs)
742 return _ext.INET
745def wait_select(conn):
746 """Wait until a connection or cursor has data available.
748 The function is an example of a wait callback to be registered with
749 `~psycopg2.extensions.set_wait_callback()`. This function uses
750 :py:func:`~select.select()` to wait for data to become available, and
751 therefore is able to handle/receive SIGINT/KeyboardInterrupt.
752 """
753 import select
754 from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE
756 while True:
757 try:
758 state = conn.poll()
759 if state == POLL_OK:
760 break
761 elif state == POLL_READ:
762 select.select([conn.fileno()], [], [])
763 elif state == POLL_WRITE:
764 select.select([], [conn.fileno()], [])
765 else:
766 raise conn.OperationalError(f"bad state from poll: {state}")
767 except KeyboardInterrupt:
768 conn.cancel()
769 # the loop will be broken by a server error
770 continue
773def _solve_conn_curs(conn_or_curs):
774 """Return the connection and a DBAPI cursor from a connection or cursor."""
775 if conn_or_curs is None:
776 raise psycopg2.ProgrammingError("no connection or cursor provided")
778 if hasattr(conn_or_curs, 'execute'):
779 conn = conn_or_curs.connection
780 curs = conn.cursor(cursor_factory=_cursor)
781 else:
782 conn = conn_or_curs
783 curs = conn.cursor(cursor_factory=_cursor)
785 return conn, curs
788class HstoreAdapter:
789 """Adapt a Python dict to the hstore syntax."""
790 def __init__(self, wrapped):
791 self.wrapped = wrapped
793 def prepare(self, conn):
794 self.conn = conn
796 # use an old-style getquoted implementation if required
797 if conn.info.server_version < 90000:
798 self.getquoted = self._getquoted_8
800 def _getquoted_8(self):
801 """Use the operators available in PG pre-9.0."""
802 if not self.wrapped:
803 return b"''::hstore"
805 adapt = _ext.adapt
806 rv = []
807 for k, v in self.wrapped.items():
808 k = adapt(k)
809 k.prepare(self.conn)
810 k = k.getquoted()
812 if v is not None:
813 v = adapt(v)
814 v.prepare(self.conn)
815 v = v.getquoted()
816 else:
817 v = b'NULL'
819 # XXX this b'ing is painfully inefficient!
820 rv.append(b"(" + k + b" => " + v + b")")
822 return b"(" + b'||'.join(rv) + b")"
824 def _getquoted_9(self):
825 """Use the hstore(text[], text[]) function."""
826 if not self.wrapped:
827 return b"''::hstore"
829 k = _ext.adapt(list(self.wrapped.keys()))
830 k.prepare(self.conn)
831 v = _ext.adapt(list(self.wrapped.values()))
832 v.prepare(self.conn)
833 return b"hstore(" + k.getquoted() + b", " + v.getquoted() + b")"
835 getquoted = _getquoted_9
837 _re_hstore = _re.compile(r"""
838 # hstore key:
839 # a string of normal or escaped chars
840 "((?: [^"\\] | \\. )*)"
841 \s*=>\s* # hstore value
842 (?:
843 NULL # the value can be null - not catched
844 # or a quoted string like the key
845 | "((?: [^"\\] | \\. )*)"
846 )
847 (?:\s*,\s*|$) # pairs separated by comma or end of string.
848 """, _re.VERBOSE)
850 @classmethod
851 def parse(self, s, cur, _bsdec=_re.compile(r"\\(.)")):
852 """Parse an hstore representation in a Python string.
854 The hstore is represented as something like::
856 "a"=>"1", "b"=>"2"
858 with backslash-escaped strings.
859 """
860 if s is None:
861 return None
863 rv = {}
864 start = 0
865 for m in self._re_hstore.finditer(s):
866 if m is None or m.start() != start:
867 raise psycopg2.InterfaceError(
868 f"error parsing hstore pair at char {start}")
869 k = _bsdec.sub(r'\1', m.group(1))
870 v = m.group(2)
871 if v is not None:
872 v = _bsdec.sub(r'\1', v)
874 rv[k] = v
875 start = m.end()
877 if start < len(s):
878 raise psycopg2.InterfaceError(
879 f"error parsing hstore: unparsed data after char {start}")
881 return rv
883 @classmethod
884 def parse_unicode(self, s, cur):
885 """Parse an hstore returning unicode keys and values."""
886 if s is None:
887 return None
889 s = s.decode(_ext.encodings[cur.connection.encoding])
890 return self.parse(s, cur)
892 @classmethod
893 def get_oids(self, conn_or_curs):
894 """Return the lists of OID of the hstore and hstore[] types.
895 """
896 conn, curs = _solve_conn_curs(conn_or_curs)
898 # Store the transaction status of the connection to revert it after use
899 conn_status = conn.status
901 # column typarray not available before PG 8.3
902 typarray = conn.info.server_version >= 80300 and "typarray" or "NULL"
904 rv0, rv1 = [], []
906 # get the oid for the hstore
907 curs.execute(f"""SELECT t.oid, {typarray}
908FROM pg_type t JOIN pg_namespace ns
909 ON typnamespace = ns.oid
910WHERE typname = 'hstore';
911""")
912 for oids in curs:
913 rv0.append(oids[0])
914 rv1.append(oids[1])
916 # revert the status of the connection as before the command
917 if (conn_status != _ext.STATUS_IN_TRANSACTION
918 and not conn.autocommit):
919 conn.rollback()
921 return tuple(rv0), tuple(rv1)
924def register_hstore(conn_or_curs, globally=False, unicode=False,
925 oid=None, array_oid=None):
926 r"""Register adapter and typecaster for `!dict`\-\ |hstore| conversions.
928 :param conn_or_curs: a connection or cursor: the typecaster will be
929 registered only on this object unless *globally* is set to `!True`
930 :param globally: register the adapter globally, not only on *conn_or_curs*
931 :param unicode: if `!True`, keys and values returned from the database
932 will be `!unicode` instead of `!str`. The option is not available on
933 Python 3
934 :param oid: the OID of the |hstore| type if known. If not, it will be
935 queried on *conn_or_curs*.
936 :param array_oid: the OID of the |hstore| array type if known. If not, it
937 will be queried on *conn_or_curs*.
939 The connection or cursor passed to the function will be used to query the
940 database and look for the OID of the |hstore| type (which may be different
941 across databases). If querying is not desirable (e.g. with
942 :ref:`asynchronous connections <async-support>`) you may specify it in the
943 *oid* parameter, which can be found using a query such as :sql:`SELECT
944 'hstore'::regtype::oid`. Analogously you can obtain a value for *array_oid*
945 using a query such as :sql:`SELECT 'hstore[]'::regtype::oid`.
947 Note that, when passing a dictionary from Python to the database, both
948 strings and unicode keys and values are supported. Dictionaries returned
949 from the database have keys/values according to the *unicode* parameter.
951 The |hstore| contrib module must be already installed in the database
952 (executing the ``hstore.sql`` script in your ``contrib`` directory).
953 Raise `~psycopg2.ProgrammingError` if the type is not found.
954 """
955 if oid is None:
956 oid = HstoreAdapter.get_oids(conn_or_curs)
957 if oid is None or not oid[0]:
958 raise psycopg2.ProgrammingError(
959 "hstore type not found in the database. "
960 "please install it from your 'contrib/hstore.sql' file")
961 else:
962 array_oid = oid[1]
963 oid = oid[0]
965 if isinstance(oid, int):
966 oid = (oid,)
968 if array_oid is not None:
969 if isinstance(array_oid, int):
970 array_oid = (array_oid,)
971 else:
972 array_oid = tuple([x for x in array_oid if x])
974 # create and register the typecaster
975 HSTORE = _ext.new_type(oid, "HSTORE", HstoreAdapter.parse)
976 _ext.register_type(HSTORE, not globally and conn_or_curs or None)
977 _ext.register_adapter(dict, HstoreAdapter)
979 if array_oid:
980 HSTOREARRAY = _ext.new_array_type(array_oid, "HSTOREARRAY", HSTORE)
981 _ext.register_type(HSTOREARRAY, not globally and conn_or_curs or None)
984class CompositeCaster:
985 """Helps conversion of a PostgreSQL composite type into a Python object.
987 The class is usually created by the `register_composite()` function.
988 You may want to create and register manually instances of the class if
989 querying the database at registration time is not desirable (such as when
990 using an :ref:`asynchronous connections <async-support>`).
992 """
993 def __init__(self, name, oid, attrs, array_oid=None, schema=None):
994 self.name = name
995 self.schema = schema
996 self.oid = oid
997 self.array_oid = array_oid
999 self.attnames = [a[0] for a in attrs]
1000 self.atttypes = [a[1] for a in attrs]
1001 self._create_type(name, self.attnames)
1002 self.typecaster = _ext.new_type((oid,), name, self.parse)
1003 if array_oid:
1004 self.array_typecaster = _ext.new_array_type(
1005 (array_oid,), f"{name}ARRAY", self.typecaster)
1006 else:
1007 self.array_typecaster = None
1009 def parse(self, s, curs):
1010 if s is None:
1011 return None
1013 tokens = self.tokenize(s)
1014 if len(tokens) != len(self.atttypes):
1015 raise psycopg2.DataError(
1016 "expecting %d components for the type %s, %d found instead" %
1017 (len(self.atttypes), self.name, len(tokens)))
1019 values = [curs.cast(oid, token)
1020 for oid, token in zip(self.atttypes, tokens)]
1022 return self.make(values)
1024 def make(self, values):
1025 """Return a new Python object representing the data being casted.
1027 *values* is the list of attributes, already casted into their Python
1028 representation.
1030 You can subclass this method to :ref:`customize the composite cast
1031 <custom-composite>`.
1032 """
1034 return self._ctor(values)
1036 _re_tokenize = _re.compile(r"""
1037 \(? ([,)]) # an empty token, representing NULL
1038| \(? " ((?: [^"] | "")*) " [,)] # or a quoted string
1039| \(? ([^",)]+) [,)] # or an unquoted string
1040 """, _re.VERBOSE)
1042 _re_undouble = _re.compile(r'(["\\])\1')
1044 @classmethod
1045 def tokenize(self, s):
1046 rv = []
1047 for m in self._re_tokenize.finditer(s):
1048 if m is None:
1049 raise psycopg2.InterfaceError(f"can't parse type: {s!r}")
1050 if m.group(1) is not None:
1051 rv.append(None)
1052 elif m.group(2) is not None:
1053 rv.append(self._re_undouble.sub(r"\1", m.group(2)))
1054 else:
1055 rv.append(m.group(3))
1057 return rv
1059 def _create_type(self, name, attnames):
1060 name = _re_clean.sub('_', name)
1061 self.type = namedtuple(name, attnames)
1062 self._ctor = self.type._make
1064 @classmethod
1065 def _from_db(self, name, conn_or_curs):
1066 """Return a `CompositeCaster` instance for the type *name*.
1068 Raise `ProgrammingError` if the type is not found.
1069 """
1070 conn, curs = _solve_conn_curs(conn_or_curs)
1072 # Store the transaction status of the connection to revert it after use
1073 conn_status = conn.status
1075 # Use the correct schema
1076 if '.' in name:
1077 schema, tname = name.split('.', 1)
1078 else:
1079 tname = name
1080 schema = 'public'
1082 # column typarray not available before PG 8.3
1083 typarray = conn.info.server_version >= 80300 and "typarray" or "NULL"
1085 # get the type oid and attributes
1086 curs.execute("""\
1087SELECT t.oid, %s, attname, atttypid
1088FROM pg_type t
1089JOIN pg_namespace ns ON typnamespace = ns.oid
1090JOIN pg_attribute a ON attrelid = typrelid
1091WHERE typname = %%s AND nspname = %%s
1092 AND attnum > 0 AND NOT attisdropped
1093ORDER BY attnum;
1094""" % typarray, (tname, schema))
1096 recs = curs.fetchall()
1098 if not recs:
1099 # The above algorithm doesn't work for customized seach_path
1100 # (#1487) The implementation below works better, but, to guarantee
1101 # backwards compatibility, use it only if the original one failed.
1102 try:
1103 savepoint = False
1104 # Because we executed statements earlier, we are either INTRANS
1105 # or we are IDLE only if the transaction is autocommit, in
1106 # which case we don't need the savepoint anyway.
1107 if conn.status == _ext.STATUS_IN_TRANSACTION:
1108 curs.execute("SAVEPOINT register_type")
1109 savepoint = True
1111 curs.execute("""\
1112SELECT t.oid, %s, attname, atttypid, typname, nspname
1113FROM pg_type t
1114JOIN pg_namespace ns ON typnamespace = ns.oid
1115JOIN pg_attribute a ON attrelid = typrelid
1116WHERE t.oid = %%s::regtype
1117 AND attnum > 0 AND NOT attisdropped
1118ORDER BY attnum;
1119""" % typarray, (name, ))
1120 except psycopg2.ProgrammingError:
1121 pass
1122 else:
1123 recs = curs.fetchall()
1124 if recs:
1125 tname = recs[0][4]
1126 schema = recs[0][5]
1127 finally:
1128 if savepoint:
1129 curs.execute("ROLLBACK TO SAVEPOINT register_type")
1131 # revert the status of the connection as before the command
1132 if conn_status != _ext.STATUS_IN_TRANSACTION and not conn.autocommit:
1133 conn.rollback()
1135 if not recs:
1136 raise psycopg2.ProgrammingError(
1137 f"PostgreSQL type '{name}' not found")
1139 type_oid = recs[0][0]
1140 array_oid = recs[0][1]
1141 type_attrs = [(r[2], r[3]) for r in recs]
1143 return self(tname, type_oid, type_attrs,
1144 array_oid=array_oid, schema=schema)
1147def register_composite(name, conn_or_curs, globally=False, factory=None):
1148 """Register a typecaster to convert a composite type into a tuple.
1150 :param name: the name of a PostgreSQL composite type, e.g. created using
1151 the |CREATE TYPE|_ command
1152 :param conn_or_curs: a connection or cursor used to find the type oid and
1153 components; the typecaster is registered in a scope limited to this
1154 object, unless *globally* is set to `!True`
1155 :param globally: if `!False` (default) register the typecaster only on
1156 *conn_or_curs*, otherwise register it globally
1157 :param factory: if specified it should be a `CompositeCaster` subclass: use
1158 it to :ref:`customize how to cast composite types <custom-composite>`
1159 :return: the registered `CompositeCaster` or *factory* instance
1160 responsible for the conversion
1161 """
1162 if factory is None:
1163 factory = CompositeCaster
1165 caster = factory._from_db(name, conn_or_curs)
1166 _ext.register_type(caster.typecaster, not globally and conn_or_curs or None)
1168 if caster.array_typecaster is not None:
1169 _ext.register_type(
1170 caster.array_typecaster, not globally and conn_or_curs or None)
1172 return caster
1175def _paginate(seq, page_size):
1176 """Consume an iterable and return it in chunks.
1178 Every chunk is at most `page_size`. Never return an empty chunk.
1179 """
1180 page = []
1181 it = iter(seq)
1182 while True:
1183 try:
1184 for i in range(page_size):
1185 page.append(next(it))
1186 yield page
1187 page = []
1188 except StopIteration:
1189 if page:
1190 yield page
1191 return
1194def execute_batch(cur, sql, argslist, page_size=100):
1195 r"""Execute groups of statements in fewer server roundtrips.
1197 Execute *sql* several times, against all parameters set (sequences or
1198 mappings) found in *argslist*.
1200 The function is semantically similar to
1202 .. parsed-literal::
1204 *cur*\.\ `~cursor.executemany`\ (\ *sql*\ , *argslist*\ )
1206 but has a different implementation: Psycopg will join the statements into
1207 fewer multi-statement commands, each one containing at most *page_size*
1208 statements, resulting in a reduced number of server roundtrips.
1210 After the execution of the function the `cursor.rowcount` property will
1211 **not** contain a total result.
1213 """
1214 for page in _paginate(argslist, page_size=page_size):
1215 sqls = [cur.mogrify(sql, args) for args in page]
1216 cur.execute(b";".join(sqls))
1219def execute_values(cur, sql, argslist, template=None, page_size=100, fetch=False):
1220 '''Execute a statement using :sql:`VALUES` with a sequence of parameters.
1222 :param cur: the cursor to use to execute the query.
1224 :param sql: the query to execute. It must contain a single ``%s``
1225 placeholder, which will be replaced by a `VALUES list`__.
1226 Example: ``"INSERT INTO mytable (id, f1, f2) VALUES %s"``.
1228 :param argslist: sequence of sequences or dictionaries with the arguments
1229 to send to the query. The type and content must be consistent with
1230 *template*.
1232 :param template: the snippet to merge to every item in *argslist* to
1233 compose the query.
1235 - If the *argslist* items are sequences it should contain positional
1236 placeholders (e.g. ``"(%s, %s, %s)"``, or ``"(%s, %s, 42)``" if there
1237 are constants value...).
1239 - If the *argslist* items are mappings it should contain named
1240 placeholders (e.g. ``"(%(id)s, %(f1)s, 42)"``).
1242 If not specified, assume the arguments are sequence and use a simple
1243 positional template (i.e. ``(%s, %s, ...)``), with the number of
1244 placeholders sniffed by the first element in *argslist*.
1246 :param page_size: maximum number of *argslist* items to include in every
1247 statement. If there are more items the function will execute more than
1248 one statement.
1250 :param fetch: if `!True` return the query results into a list (like in a
1251 `~cursor.fetchall()`). Useful for queries with :sql:`RETURNING`
1252 clause.
1254 .. __: https://www.postgresql.org/docs/current/static/queries-values.html
1256 After the execution of the function the `cursor.rowcount` property will
1257 **not** contain a total result.
1259 While :sql:`INSERT` is an obvious candidate for this function it is
1260 possible to use it with other statements, for example::
1262 >>> cur.execute(
1263 ... "create table test (id int primary key, v1 int, v2 int)")
1265 >>> execute_values(cur,
1266 ... "INSERT INTO test (id, v1, v2) VALUES %s",
1267 ... [(1, 2, 3), (4, 5, 6), (7, 8, 9)])
1269 >>> execute_values(cur,
1270 ... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1)
1271 ... WHERE test.id = data.id""",
1272 ... [(1, 20), (4, 50)])
1274 >>> cur.execute("select * from test order by id")
1275 >>> cur.fetchall()
1276 [(1, 20, 3), (4, 50, 6), (7, 8, 9)])
1278 '''
1279 from psycopg2.sql import Composable
1280 if isinstance(sql, Composable):
1281 sql = sql.as_string(cur)
1283 # we can't just use sql % vals because vals is bytes: if sql is bytes
1284 # there will be some decoding error because of stupid codec used, and Py3
1285 # doesn't implement % on bytes.
1286 if not isinstance(sql, bytes):
1287 sql = sql.encode(_ext.encodings[cur.connection.encoding])
1288 pre, post = _split_sql(sql)
1290 result = [] if fetch else None
1291 for page in _paginate(argslist, page_size=page_size):
1292 if template is None:
1293 template = b'(' + b','.join([b'%s'] * len(page[0])) + b')'
1294 parts = pre[:]
1295 for args in page:
1296 parts.append(cur.mogrify(template, args))
1297 parts.append(b',')
1298 parts[-1:] = post
1299 cur.execute(b''.join(parts))
1300 if fetch:
1301 result.extend(cur.fetchall())
1303 return result
1306def _split_sql(sql):
1307 """Split *sql* on a single ``%s`` placeholder.
1309 Split on the %s, perform %% replacement and return pre, post lists of
1310 snippets.
1311 """
1312 curr = pre = []
1313 post = []
1314 tokens = _re.split(br'(%.)', sql)
1315 for token in tokens:
1316 if len(token) != 2 or token[:1] != b'%':
1317 curr.append(token)
1318 continue
1320 if token[1:] == b's':
1321 if curr is pre:
1322 curr = post
1323 else:
1324 raise ValueError(
1325 "the query contains more than one '%s' placeholder")
1326 elif token[1:] == b'%':
1327 curr.append(b'%')
1328 else:
1329 raise ValueError("unsupported format character: '%s'"
1330 % token[1:].decode('ascii', 'replace'))
1332 if curr is pre:
1333 raise ValueError("the query doesn't contain any '%s' placeholder")
1335 return pre, post
1338# ascii except alnum and underscore
1339_re_clean = _re.compile(
1340 '[' + _re.escape(' !"#$%&\'()*+,-./:;<=>?@[\\]^`{|}~') + ']')