Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/base.py: 68%
170 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2PostgreSQL database backend for Django.
4Requires psycopg 2: https://www.psycopg.org/
5"""
7import asyncio
8import threading
9import warnings
10from contextlib import contextmanager
12from django.conf import settings
13from django.core.exceptions import ImproperlyConfigured
14from django.db import DatabaseError as WrappedDatabaseError
15from django.db import connections
16from django.db.backends.base.base import BaseDatabaseWrapper
17from django.db.backends.utils import CursorDebugWrapper as BaseCursorDebugWrapper
18from django.utils.asyncio import async_unsafe
19from django.utils.functional import cached_property
20from django.utils.safestring import SafeString
21from django.utils.version import get_version_tuple
23try:
24 import psycopg2 as Database
25 import psycopg2.extensions
26 import psycopg2.extras
27except ImportError as e:
28 raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e)
31def psycopg2_version():
32 version = psycopg2.__version__.split(" ", 1)[0]
33 return get_version_tuple(version)
36PSYCOPG2_VERSION = psycopg2_version()
38if PSYCOPG2_VERSION < (2, 8, 4): 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true
39 raise ImproperlyConfigured(
40 "psycopg2 version 2.8.4 or newer is required; you have %s"
41 % psycopg2.__version__
42 )
45# Some of these import psycopg2, so import them after checking if it's installed.
46from .client import DatabaseClient # NOQA
47from .creation import DatabaseCreation # NOQA
48from .features import DatabaseFeatures # NOQA
49from .introspection import DatabaseIntrospection # NOQA
50from .operations import DatabaseOperations # NOQA
51from .schema import DatabaseSchemaEditor # NOQA
53psycopg2.extensions.register_adapter(SafeString, psycopg2.extensions.QuotedString)
54psycopg2.extras.register_uuid()
56# Register support for inet[] manually so we don't have to handle the Inet()
57# object on load all the time.
58INETARRAY_OID = 1041
59INETARRAY = psycopg2.extensions.new_array_type(
60 (INETARRAY_OID,),
61 "INETARRAY",
62 psycopg2.extensions.UNICODE,
63)
64psycopg2.extensions.register_type(INETARRAY)
67class DatabaseWrapper(BaseDatabaseWrapper):
68 vendor = "postgresql"
69 display_name = "PostgreSQL"
70 # This dictionary maps Field objects to their associated PostgreSQL column
71 # types, as strings. Column-type strings can contain format strings; they'll
72 # be interpolated against the values of Field.__dict__ before being output.
73 # If a column type is set to None, it won't be included in the output.
74 data_types = {
75 "AutoField": "serial",
76 "BigAutoField": "bigserial",
77 "BinaryField": "bytea",
78 "BooleanField": "boolean",
79 "CharField": "varchar(%(max_length)s)",
80 "DateField": "date",
81 "DateTimeField": "timestamp with time zone",
82 "DecimalField": "numeric(%(max_digits)s, %(decimal_places)s)",
83 "DurationField": "interval",
84 "FileField": "varchar(%(max_length)s)",
85 "FilePathField": "varchar(%(max_length)s)",
86 "FloatField": "double precision",
87 "IntegerField": "integer",
88 "BigIntegerField": "bigint",
89 "IPAddressField": "inet",
90 "GenericIPAddressField": "inet",
91 "JSONField": "jsonb",
92 "OneToOneField": "integer",
93 "PositiveBigIntegerField": "bigint",
94 "PositiveIntegerField": "integer",
95 "PositiveSmallIntegerField": "smallint",
96 "SlugField": "varchar(%(max_length)s)",
97 "SmallAutoField": "smallserial",
98 "SmallIntegerField": "smallint",
99 "TextField": "text",
100 "TimeField": "time",
101 "UUIDField": "uuid",
102 }
103 data_type_check_constraints = {
104 "PositiveBigIntegerField": '"%(column)s" >= 0',
105 "PositiveIntegerField": '"%(column)s" >= 0',
106 "PositiveSmallIntegerField": '"%(column)s" >= 0',
107 }
108 operators = {
109 "exact": "= %s",
110 "iexact": "= UPPER(%s)",
111 "contains": "LIKE %s",
112 "icontains": "LIKE UPPER(%s)",
113 "regex": "~ %s",
114 "iregex": "~* %s",
115 "gt": "> %s",
116 "gte": ">= %s",
117 "lt": "< %s",
118 "lte": "<= %s",
119 "startswith": "LIKE %s",
120 "endswith": "LIKE %s",
121 "istartswith": "LIKE UPPER(%s)",
122 "iendswith": "LIKE UPPER(%s)",
123 }
125 # The patterns below are used to generate SQL pattern lookup clauses when
126 # the right-hand side of the lookup isn't a raw string (it might be an expression
127 # or the result of a bilateral transformation).
128 # In those cases, special characters for LIKE operators (e.g. \, *, _) should be
129 # escaped on database side.
130 #
131 # Note: we use str.format() here for readability as '%' is used as a wildcard for
132 # the LIKE operator.
133 pattern_esc = (
134 r"REPLACE(REPLACE(REPLACE({}, E'\\', E'\\\\'), E'%%', E'\\%%'), E'_', E'\\_')"
135 )
136 pattern_ops = {
137 "contains": "LIKE '%%' || {} || '%%'",
138 "icontains": "LIKE '%%' || UPPER({}) || '%%'",
139 "startswith": "LIKE {} || '%%'",
140 "istartswith": "LIKE UPPER({}) || '%%'",
141 "endswith": "LIKE '%%' || {}",
142 "iendswith": "LIKE '%%' || UPPER({})",
143 }
145 Database = Database
146 SchemaEditorClass = DatabaseSchemaEditor
147 # Classes instantiated in __init__().
148 client_class = DatabaseClient
149 creation_class = DatabaseCreation
150 features_class = DatabaseFeatures
151 introspection_class = DatabaseIntrospection
152 ops_class = DatabaseOperations
153 # PostgreSQL backend-specific attributes.
154 _named_cursor_idx = 0
156 def get_connection_params(self):
157 settings_dict = self.settings_dict
158 # None may be used to connect to the default 'postgres' db
159 if settings_dict["NAME"] == "" and not settings_dict.get("OPTIONS", {}).get( 159 ↛ 162line 159 didn't jump to line 162, because the condition on line 159 was never true
160 "service"
161 ):
162 raise ImproperlyConfigured(
163 "settings.DATABASES is improperly configured. "
164 "Please supply the NAME or OPTIONS['service'] value."
165 )
166 if len(settings_dict["NAME"] or "") > self.ops.max_name_length(): 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true
167 raise ImproperlyConfigured(
168 "The database name '%s' (%d characters) is longer than "
169 "PostgreSQL's limit of %d characters. Supply a shorter NAME "
170 "in settings.DATABASES."
171 % (
172 settings_dict["NAME"],
173 len(settings_dict["NAME"]),
174 self.ops.max_name_length(),
175 )
176 )
177 conn_params = {}
178 if settings_dict["NAME"]:
179 conn_params = {
180 "database": settings_dict["NAME"],
181 **settings_dict["OPTIONS"],
182 }
183 elif settings_dict["NAME"] is None: 183 ↛ 188line 183 didn't jump to line 188, because the condition on line 183 was never false
184 # Connect to the default 'postgres' db.
185 settings_dict.get("OPTIONS", {}).pop("service", None)
186 conn_params = {"database": "postgres", **settings_dict["OPTIONS"]}
187 else:
188 conn_params = {**settings_dict["OPTIONS"]}
190 conn_params.pop("isolation_level", None)
191 if settings_dict["USER"]: 191 ↛ 193line 191 didn't jump to line 193, because the condition on line 191 was never false
192 conn_params["user"] = settings_dict["USER"]
193 if settings_dict["PASSWORD"]: 193 ↛ 195line 193 didn't jump to line 195, because the condition on line 193 was never false
194 conn_params["password"] = settings_dict["PASSWORD"]
195 if settings_dict["HOST"]: 195 ↛ 197line 195 didn't jump to line 197, because the condition on line 195 was never false
196 conn_params["host"] = settings_dict["HOST"]
197 if settings_dict["PORT"]: 197 ↛ 199line 197 didn't jump to line 199, because the condition on line 197 was never false
198 conn_params["port"] = settings_dict["PORT"]
199 return conn_params
201 @async_unsafe
202 def get_new_connection(self, conn_params):
203 connection = Database.connect(**conn_params)
205 # self.isolation_level must be set:
206 # - after connecting to the database in order to obtain the database's
207 # default when no value is explicitly specified in options.
208 # - before calling _set_autocommit() because if autocommit is on, that
209 # will set connection.isolation_level to ISOLATION_LEVEL_AUTOCOMMIT.
210 options = self.settings_dict["OPTIONS"]
211 try:
212 self.isolation_level = options["isolation_level"]
213 except KeyError:
214 self.isolation_level = connection.isolation_level
215 else:
216 # Set the isolation level to the value from OPTIONS.
217 if self.isolation_level != connection.isolation_level:
218 connection.set_session(isolation_level=self.isolation_level)
219 # Register dummy loads() to avoid a round trip from psycopg2's decode
220 # to json.dumps() to json.loads(), when using a custom decoder in
221 # JSONField.
222 psycopg2.extras.register_default_jsonb(
223 conn_or_curs=connection, loads=lambda x: x
224 )
225 return connection
227 def ensure_timezone(self):
228 if self.connection is None: 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true
229 return False
230 conn_timezone_name = self.connection.get_parameter_status("TimeZone")
231 timezone_name = self.timezone_name
232 if timezone_name and conn_timezone_name != timezone_name: 232 ↛ 233line 232 didn't jump to line 233, because the condition on line 232 was never true
233 with self.connection.cursor() as cursor:
234 cursor.execute(self.ops.set_time_zone_sql(), [timezone_name])
235 return True
236 return False
238 def init_connection_state(self):
239 self.connection.set_client_encoding("UTF8")
241 timezone_changed = self.ensure_timezone()
242 if timezone_changed: 242 ↛ 244line 242 didn't jump to line 244, because the condition on line 242 was never true
243 # Commit after setting the time zone (see #17062)
244 if not self.get_autocommit():
245 self.connection.commit()
247 @async_unsafe
248 def create_cursor(self, name=None):
249 if name: 249 ↛ 252line 249 didn't jump to line 252, because the condition on line 249 was never true
250 # In autocommit mode, the cursor will be used outside of a
251 # transaction, hence use a holdable cursor.
252 cursor = self.connection.cursor(
253 name, scrollable=False, withhold=self.connection.autocommit
254 )
255 else:
256 cursor = self.connection.cursor()
257 cursor.tzinfo_factory = self.tzinfo_factory if settings.USE_TZ else None
258 return cursor
260 def tzinfo_factory(self, offset):
261 return self.timezone
263 @async_unsafe
264 def chunked_cursor(self):
265 self._named_cursor_idx += 1
266 # Get the current async task
267 # Note that right now this is behind @async_unsafe, so this is
268 # unreachable, but in future we'll start loosening this restriction.
269 # For now, it's here so that every use of "threading" is
270 # also async-compatible.
271 try:
272 current_task = asyncio.current_task()
273 except RuntimeError:
274 current_task = None
275 # Current task can be none even if the current_task call didn't error
276 if current_task:
277 task_ident = str(id(current_task))
278 else:
279 task_ident = "sync"
280 # Use that and the thread ident to get a unique name
281 return self._cursor(
282 name="_django_curs_%d_%s_%d"
283 % (
284 # Avoid reusing name in other threads / tasks
285 threading.current_thread().ident,
286 task_ident,
287 self._named_cursor_idx,
288 )
289 )
291 def _set_autocommit(self, autocommit):
292 with self.wrap_database_errors:
293 self.connection.autocommit = autocommit
295 def check_constraints(self, table_names=None):
296 """
297 Check constraints by setting them to immediate. Return them to deferred
298 afterward.
299 """
300 with self.cursor() as cursor:
301 cursor.execute("SET CONSTRAINTS ALL IMMEDIATE")
302 cursor.execute("SET CONSTRAINTS ALL DEFERRED")
304 def is_usable(self):
305 try:
306 # Use a psycopg cursor directly, bypassing Django's utilities.
307 with self.connection.cursor() as cursor:
308 cursor.execute("SELECT 1")
309 except Database.Error:
310 return False
311 else:
312 return True
314 @contextmanager
315 def _nodb_cursor(self):
316 cursor = None
317 try:
318 with super()._nodb_cursor() as cursor:
319 yield cursor
320 except (Database.DatabaseError, WrappedDatabaseError):
321 if cursor is not None:
322 raise
323 warnings.warn(
324 "Normally Django will use a connection to the 'postgres' database "
325 "to avoid running initialization queries against the production "
326 "database when it's not needed (for example, when running tests). "
327 "Django was unable to create a connection to the 'postgres' database "
328 "and will use the first PostgreSQL database instead.",
329 RuntimeWarning,
330 )
331 for connection in connections.all():
332 if (
333 connection.vendor == "postgresql"
334 and connection.settings_dict["NAME"] != "postgres"
335 ):
336 conn = self.__class__(
337 {
338 **self.settings_dict,
339 "NAME": connection.settings_dict["NAME"],
340 },
341 alias=self.alias,
342 )
343 try:
344 with conn.cursor() as cursor:
345 yield cursor
346 finally:
347 conn.close()
348 break
349 else:
350 raise
352 @cached_property
353 def pg_version(self):
354 with self.temporary_connection():
355 return self.connection.server_version
357 def make_debug_cursor(self, cursor):
358 return CursorDebugWrapper(cursor, self)
361class CursorDebugWrapper(BaseCursorDebugWrapper):
362 def copy_expert(self, sql, file, *args):
363 with self.debug_sql(sql):
364 return self.cursor.copy_expert(sql, file, *args)
366 def copy_to(self, file, table, *args, **kwargs):
367 with self.debug_sql(sql="COPY %s TO STDOUT" % table):
368 return self.cursor.copy_to(file, table, *args, **kwargs)