Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/base.py: 68%

170 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2PostgreSQL database backend for Django. 

3 

4Requires psycopg 2: https://www.psycopg.org/ 

5""" 

6 

7import asyncio 

8import threading 

9import warnings 

10from contextlib import contextmanager 

11 

12from django.conf import settings 

13from django.core.exceptions import ImproperlyConfigured 

14from django.db import DatabaseError as WrappedDatabaseError 

15from django.db import connections 

16from django.db.backends.base.base import BaseDatabaseWrapper 

17from django.db.backends.utils import CursorDebugWrapper as BaseCursorDebugWrapper 

18from django.utils.asyncio import async_unsafe 

19from django.utils.functional import cached_property 

20from django.utils.safestring import SafeString 

21from django.utils.version import get_version_tuple 

22 

23try: 

24 import psycopg2 as Database 

25 import psycopg2.extensions 

26 import psycopg2.extras 

27except ImportError as e: 

28 raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e) 

29 

30 

31def psycopg2_version(): 

32 version = psycopg2.__version__.split(" ", 1)[0] 

33 return get_version_tuple(version) 

34 

35 

36PSYCOPG2_VERSION = psycopg2_version() 

37 

38if PSYCOPG2_VERSION < (2, 8, 4): 38 ↛ 39line 38 didn't jump to line 39, because the condition on line 38 was never true

39 raise ImproperlyConfigured( 

40 "psycopg2 version 2.8.4 or newer is required; you have %s" 

41 % psycopg2.__version__ 

42 ) 

43 

44 

45# Some of these import psycopg2, so import them after checking if it's installed. 

46from .client import DatabaseClient # NOQA 

47from .creation import DatabaseCreation # NOQA 

48from .features import DatabaseFeatures # NOQA 

49from .introspection import DatabaseIntrospection # NOQA 

50from .operations import DatabaseOperations # NOQA 

51from .schema import DatabaseSchemaEditor # NOQA 

52 

53psycopg2.extensions.register_adapter(SafeString, psycopg2.extensions.QuotedString) 

54psycopg2.extras.register_uuid() 

55 

56# Register support for inet[] manually so we don't have to handle the Inet() 

57# object on load all the time. 

58INETARRAY_OID = 1041 

59INETARRAY = psycopg2.extensions.new_array_type( 

60 (INETARRAY_OID,), 

61 "INETARRAY", 

62 psycopg2.extensions.UNICODE, 

63) 

64psycopg2.extensions.register_type(INETARRAY) 

65 

66 

67class DatabaseWrapper(BaseDatabaseWrapper): 

68 vendor = "postgresql" 

69 display_name = "PostgreSQL" 

70 # This dictionary maps Field objects to their associated PostgreSQL column 

71 # types, as strings. Column-type strings can contain format strings; they'll 

72 # be interpolated against the values of Field.__dict__ before being output. 

73 # If a column type is set to None, it won't be included in the output. 

74 data_types = { 

75 "AutoField": "serial", 

76 "BigAutoField": "bigserial", 

77 "BinaryField": "bytea", 

78 "BooleanField": "boolean", 

79 "CharField": "varchar(%(max_length)s)", 

80 "DateField": "date", 

81 "DateTimeField": "timestamp with time zone", 

82 "DecimalField": "numeric(%(max_digits)s, %(decimal_places)s)", 

83 "DurationField": "interval", 

84 "FileField": "varchar(%(max_length)s)", 

85 "FilePathField": "varchar(%(max_length)s)", 

86 "FloatField": "double precision", 

87 "IntegerField": "integer", 

88 "BigIntegerField": "bigint", 

89 "IPAddressField": "inet", 

90 "GenericIPAddressField": "inet", 

91 "JSONField": "jsonb", 

92 "OneToOneField": "integer", 

93 "PositiveBigIntegerField": "bigint", 

94 "PositiveIntegerField": "integer", 

95 "PositiveSmallIntegerField": "smallint", 

96 "SlugField": "varchar(%(max_length)s)", 

97 "SmallAutoField": "smallserial", 

98 "SmallIntegerField": "smallint", 

99 "TextField": "text", 

100 "TimeField": "time", 

101 "UUIDField": "uuid", 

102 } 

103 data_type_check_constraints = { 

104 "PositiveBigIntegerField": '"%(column)s" >= 0', 

105 "PositiveIntegerField": '"%(column)s" >= 0', 

106 "PositiveSmallIntegerField": '"%(column)s" >= 0', 

107 } 

108 operators = { 

109 "exact": "= %s", 

110 "iexact": "= UPPER(%s)", 

111 "contains": "LIKE %s", 

112 "icontains": "LIKE UPPER(%s)", 

113 "regex": "~ %s", 

114 "iregex": "~* %s", 

115 "gt": "> %s", 

116 "gte": ">= %s", 

117 "lt": "< %s", 

118 "lte": "<= %s", 

119 "startswith": "LIKE %s", 

120 "endswith": "LIKE %s", 

121 "istartswith": "LIKE UPPER(%s)", 

122 "iendswith": "LIKE UPPER(%s)", 

123 } 

124 

125 # The patterns below are used to generate SQL pattern lookup clauses when 

126 # the right-hand side of the lookup isn't a raw string (it might be an expression 

127 # or the result of a bilateral transformation). 

128 # In those cases, special characters for LIKE operators (e.g. \, *, _) should be 

129 # escaped on database side. 

130 # 

131 # Note: we use str.format() here for readability as '%' is used as a wildcard for 

132 # the LIKE operator. 

133 pattern_esc = ( 

134 r"REPLACE(REPLACE(REPLACE({}, E'\\', E'\\\\'), E'%%', E'\\%%'), E'_', E'\\_')" 

135 ) 

136 pattern_ops = { 

137 "contains": "LIKE '%%' || {} || '%%'", 

138 "icontains": "LIKE '%%' || UPPER({}) || '%%'", 

139 "startswith": "LIKE {} || '%%'", 

140 "istartswith": "LIKE UPPER({}) || '%%'", 

141 "endswith": "LIKE '%%' || {}", 

142 "iendswith": "LIKE '%%' || UPPER({})", 

143 } 

144 

145 Database = Database 

146 SchemaEditorClass = DatabaseSchemaEditor 

147 # Classes instantiated in __init__(). 

148 client_class = DatabaseClient 

149 creation_class = DatabaseCreation 

150 features_class = DatabaseFeatures 

151 introspection_class = DatabaseIntrospection 

152 ops_class = DatabaseOperations 

153 # PostgreSQL backend-specific attributes. 

154 _named_cursor_idx = 0 

155 

156 def get_connection_params(self): 

157 settings_dict = self.settings_dict 

158 # None may be used to connect to the default 'postgres' db 

159 if settings_dict["NAME"] == "" and not settings_dict.get("OPTIONS", {}).get( 159 ↛ 162line 159 didn't jump to line 162, because the condition on line 159 was never true

160 "service" 

161 ): 

162 raise ImproperlyConfigured( 

163 "settings.DATABASES is improperly configured. " 

164 "Please supply the NAME or OPTIONS['service'] value." 

165 ) 

166 if len(settings_dict["NAME"] or "") > self.ops.max_name_length(): 166 ↛ 167line 166 didn't jump to line 167, because the condition on line 166 was never true

167 raise ImproperlyConfigured( 

168 "The database name '%s' (%d characters) is longer than " 

169 "PostgreSQL's limit of %d characters. Supply a shorter NAME " 

170 "in settings.DATABASES." 

171 % ( 

172 settings_dict["NAME"], 

173 len(settings_dict["NAME"]), 

174 self.ops.max_name_length(), 

175 ) 

176 ) 

177 conn_params = {} 

178 if settings_dict["NAME"]: 

179 conn_params = { 

180 "database": settings_dict["NAME"], 

181 **settings_dict["OPTIONS"], 

182 } 

183 elif settings_dict["NAME"] is None: 183 ↛ 188line 183 didn't jump to line 188, because the condition on line 183 was never false

184 # Connect to the default 'postgres' db. 

185 settings_dict.get("OPTIONS", {}).pop("service", None) 

186 conn_params = {"database": "postgres", **settings_dict["OPTIONS"]} 

187 else: 

188 conn_params = {**settings_dict["OPTIONS"]} 

189 

190 conn_params.pop("isolation_level", None) 

191 if settings_dict["USER"]: 191 ↛ 193line 191 didn't jump to line 193, because the condition on line 191 was never false

192 conn_params["user"] = settings_dict["USER"] 

193 if settings_dict["PASSWORD"]: 193 ↛ 195line 193 didn't jump to line 195, because the condition on line 193 was never false

194 conn_params["password"] = settings_dict["PASSWORD"] 

195 if settings_dict["HOST"]: 195 ↛ 197line 195 didn't jump to line 197, because the condition on line 195 was never false

196 conn_params["host"] = settings_dict["HOST"] 

197 if settings_dict["PORT"]: 197 ↛ 199line 197 didn't jump to line 199, because the condition on line 197 was never false

198 conn_params["port"] = settings_dict["PORT"] 

199 return conn_params 

200 

201 @async_unsafe 

202 def get_new_connection(self, conn_params): 

203 connection = Database.connect(**conn_params) 

204 

205 # self.isolation_level must be set: 

206 # - after connecting to the database in order to obtain the database's 

207 # default when no value is explicitly specified in options. 

208 # - before calling _set_autocommit() because if autocommit is on, that 

209 # will set connection.isolation_level to ISOLATION_LEVEL_AUTOCOMMIT. 

210 options = self.settings_dict["OPTIONS"] 

211 try: 

212 self.isolation_level = options["isolation_level"] 

213 except KeyError: 

214 self.isolation_level = connection.isolation_level 

215 else: 

216 # Set the isolation level to the value from OPTIONS. 

217 if self.isolation_level != connection.isolation_level: 

218 connection.set_session(isolation_level=self.isolation_level) 

219 # Register dummy loads() to avoid a round trip from psycopg2's decode 

220 # to json.dumps() to json.loads(), when using a custom decoder in 

221 # JSONField. 

222 psycopg2.extras.register_default_jsonb( 

223 conn_or_curs=connection, loads=lambda x: x 

224 ) 

225 return connection 

226 

227 def ensure_timezone(self): 

228 if self.connection is None: 228 ↛ 229line 228 didn't jump to line 229, because the condition on line 228 was never true

229 return False 

230 conn_timezone_name = self.connection.get_parameter_status("TimeZone") 

231 timezone_name = self.timezone_name 

232 if timezone_name and conn_timezone_name != timezone_name: 232 ↛ 233line 232 didn't jump to line 233, because the condition on line 232 was never true

233 with self.connection.cursor() as cursor: 

234 cursor.execute(self.ops.set_time_zone_sql(), [timezone_name]) 

235 return True 

236 return False 

237 

238 def init_connection_state(self): 

239 self.connection.set_client_encoding("UTF8") 

240 

241 timezone_changed = self.ensure_timezone() 

242 if timezone_changed: 242 ↛ 244line 242 didn't jump to line 244, because the condition on line 242 was never true

243 # Commit after setting the time zone (see #17062) 

244 if not self.get_autocommit(): 

245 self.connection.commit() 

246 

247 @async_unsafe 

248 def create_cursor(self, name=None): 

249 if name: 249 ↛ 252line 249 didn't jump to line 252, because the condition on line 249 was never true

250 # In autocommit mode, the cursor will be used outside of a 

251 # transaction, hence use a holdable cursor. 

252 cursor = self.connection.cursor( 

253 name, scrollable=False, withhold=self.connection.autocommit 

254 ) 

255 else: 

256 cursor = self.connection.cursor() 

257 cursor.tzinfo_factory = self.tzinfo_factory if settings.USE_TZ else None 

258 return cursor 

259 

260 def tzinfo_factory(self, offset): 

261 return self.timezone 

262 

263 @async_unsafe 

264 def chunked_cursor(self): 

265 self._named_cursor_idx += 1 

266 # Get the current async task 

267 # Note that right now this is behind @async_unsafe, so this is 

268 # unreachable, but in future we'll start loosening this restriction. 

269 # For now, it's here so that every use of "threading" is 

270 # also async-compatible. 

271 try: 

272 current_task = asyncio.current_task() 

273 except RuntimeError: 

274 current_task = None 

275 # Current task can be none even if the current_task call didn't error 

276 if current_task: 

277 task_ident = str(id(current_task)) 

278 else: 

279 task_ident = "sync" 

280 # Use that and the thread ident to get a unique name 

281 return self._cursor( 

282 name="_django_curs_%d_%s_%d" 

283 % ( 

284 # Avoid reusing name in other threads / tasks 

285 threading.current_thread().ident, 

286 task_ident, 

287 self._named_cursor_idx, 

288 ) 

289 ) 

290 

291 def _set_autocommit(self, autocommit): 

292 with self.wrap_database_errors: 

293 self.connection.autocommit = autocommit 

294 

295 def check_constraints(self, table_names=None): 

296 """ 

297 Check constraints by setting them to immediate. Return them to deferred 

298 afterward. 

299 """ 

300 with self.cursor() as cursor: 

301 cursor.execute("SET CONSTRAINTS ALL IMMEDIATE") 

302 cursor.execute("SET CONSTRAINTS ALL DEFERRED") 

303 

304 def is_usable(self): 

305 try: 

306 # Use a psycopg cursor directly, bypassing Django's utilities. 

307 with self.connection.cursor() as cursor: 

308 cursor.execute("SELECT 1") 

309 except Database.Error: 

310 return False 

311 else: 

312 return True 

313 

314 @contextmanager 

315 def _nodb_cursor(self): 

316 cursor = None 

317 try: 

318 with super()._nodb_cursor() as cursor: 

319 yield cursor 

320 except (Database.DatabaseError, WrappedDatabaseError): 

321 if cursor is not None: 

322 raise 

323 warnings.warn( 

324 "Normally Django will use a connection to the 'postgres' database " 

325 "to avoid running initialization queries against the production " 

326 "database when it's not needed (for example, when running tests). " 

327 "Django was unable to create a connection to the 'postgres' database " 

328 "and will use the first PostgreSQL database instead.", 

329 RuntimeWarning, 

330 ) 

331 for connection in connections.all(): 

332 if ( 

333 connection.vendor == "postgresql" 

334 and connection.settings_dict["NAME"] != "postgres" 

335 ): 

336 conn = self.__class__( 

337 { 

338 **self.settings_dict, 

339 "NAME": connection.settings_dict["NAME"], 

340 }, 

341 alias=self.alias, 

342 ) 

343 try: 

344 with conn.cursor() as cursor: 

345 yield cursor 

346 finally: 

347 conn.close() 

348 break 

349 else: 

350 raise 

351 

352 @cached_property 

353 def pg_version(self): 

354 with self.temporary_connection(): 

355 return self.connection.server_version 

356 

357 def make_debug_cursor(self, cursor): 

358 return CursorDebugWrapper(cursor, self) 

359 

360 

361class CursorDebugWrapper(BaseCursorDebugWrapper): 

362 def copy_expert(self, sql, file, *args): 

363 with self.debug_sql(sql): 

364 return self.cursor.copy_expert(sql, file, *args) 

365 

366 def copy_to(self, file, table, *args, **kwargs): 

367 with self.debug_sql(sql="COPY %s TO STDOUT" % table): 

368 return self.cursor.copy_to(file, table, *args, **kwargs)