Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/utils.py: 45%
147 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1import datetime
2import decimal
3import functools
4import hashlib
5import logging
6import time
7from contextlib import contextmanager
9from django.db import NotSupportedError
10from django.utils.dateparse import parse_time
12logger = logging.getLogger("django.db.backends")
15class CursorWrapper:
16 def __init__(self, cursor, db):
17 self.cursor = cursor
18 self.db = db
20 WRAP_ERROR_ATTRS = frozenset(["fetchone", "fetchmany", "fetchall", "nextset"])
22 def __getattr__(self, attr):
23 cursor_attr = getattr(self.cursor, attr)
24 if attr in CursorWrapper.WRAP_ERROR_ATTRS:
25 return self.db.wrap_database_errors(cursor_attr)
26 else:
27 return cursor_attr
29 def __iter__(self):
30 with self.db.wrap_database_errors:
31 yield from self.cursor
33 def __enter__(self):
34 return self
36 def __exit__(self, type, value, traceback):
37 # Close instead of passing through to avoid backend-specific behavior
38 # (#17671). Catch errors liberally because errors in cleanup code
39 # aren't useful.
40 try:
41 self.close()
42 except self.db.Database.Error:
43 pass
45 # The following methods cannot be implemented in __getattr__, because the
46 # code must run when the method is invoked, not just when it is accessed.
48 def callproc(self, procname, params=None, kparams=None):
49 # Keyword parameters for callproc aren't supported in PEP 249, but the
50 # database driver may support them (e.g. cx_Oracle).
51 if kparams is not None and not self.db.features.supports_callproc_kwargs:
52 raise NotSupportedError(
53 "Keyword parameters for callproc are not supported on this "
54 "database backend."
55 )
56 self.db.validate_no_broken_transaction()
57 with self.db.wrap_database_errors:
58 if params is None and kparams is None:
59 return self.cursor.callproc(procname)
60 elif kparams is None:
61 return self.cursor.callproc(procname, params)
62 else:
63 params = params or ()
64 return self.cursor.callproc(procname, params, kparams)
66 def execute(self, sql, params=None):
67 return self._execute_with_wrappers(
68 sql, params, many=False, executor=self._execute
69 )
71 def executemany(self, sql, param_list):
72 return self._execute_with_wrappers(
73 sql, param_list, many=True, executor=self._executemany
74 )
76 def _execute_with_wrappers(self, sql, params, many, executor):
77 context = {"connection": self.db, "cursor": self}
78 for wrapper in reversed(self.db.execute_wrappers): 78 ↛ 79line 78 didn't jump to line 79, because the loop on line 78 never started
79 executor = functools.partial(wrapper, executor)
80 return executor(sql, params, many, context)
82 def _execute(self, sql, params, *ignored_wrapper_args):
83 self.db.validate_no_broken_transaction()
84 with self.db.wrap_database_errors:
85 if params is None:
86 # params default might be backend specific.
87 return self.cursor.execute(sql)
88 else:
89 return self.cursor.execute(sql, params)
91 def _executemany(self, sql, param_list, *ignored_wrapper_args):
92 self.db.validate_no_broken_transaction()
93 with self.db.wrap_database_errors:
94 return self.cursor.executemany(sql, param_list)
97class CursorDebugWrapper(CursorWrapper):
99 # XXX callproc isn't instrumented at this time.
101 def execute(self, sql, params=None):
102 with self.debug_sql(sql, params, use_last_executed_query=True):
103 return super().execute(sql, params)
105 def executemany(self, sql, param_list):
106 with self.debug_sql(sql, param_list, many=True):
107 return super().executemany(sql, param_list)
109 @contextmanager
110 def debug_sql(
111 self, sql=None, params=None, use_last_executed_query=False, many=False
112 ):
113 start = time.monotonic()
114 try:
115 yield
116 finally:
117 stop = time.monotonic()
118 duration = stop - start
119 if use_last_executed_query:
120 sql = self.db.ops.last_executed_query(self.cursor, sql, params)
121 try:
122 times = len(params) if many else ""
123 except TypeError:
124 # params could be an iterator.
125 times = "?"
126 self.db.queries_log.append(
127 {
128 "sql": "%s times: %s" % (times, sql) if many else sql,
129 "time": "%.3f" % duration,
130 }
131 )
132 logger.debug(
133 "(%.3f) %s; args=%s; alias=%s",
134 duration,
135 sql,
136 params,
137 self.db.alias,
138 extra={
139 "duration": duration,
140 "sql": sql,
141 "params": params,
142 "alias": self.db.alias,
143 },
144 )
147def split_tzname_delta(tzname):
148 """
149 Split a time zone name into a 3-tuple of (name, sign, offset).
150 """
151 for sign in ["+", "-"]:
152 if sign in tzname: 152 ↛ 153line 152 didn't jump to line 153, because the condition on line 152 was never true
153 name, offset = tzname.rsplit(sign, 1)
154 if offset and parse_time(offset):
155 return name, sign, offset
156 return tzname, None, None
159###############################################
160# Converters from database (string) to Python #
161###############################################
164def typecast_date(s):
165 return (
166 datetime.date(*map(int, s.split("-"))) if s else None
167 ) # return None if s is null
170def typecast_time(s): # does NOT store time zone information
171 if not s:
172 return None
173 hour, minutes, seconds = s.split(":")
174 if "." in seconds: # check whether seconds have a fractional part
175 seconds, microseconds = seconds.split(".")
176 else:
177 microseconds = "0"
178 return datetime.time(
179 int(hour), int(minutes), int(seconds), int((microseconds + "000000")[:6])
180 )
183def typecast_timestamp(s): # does NOT store time zone information
184 # "2005-07-29 15:48:00.590358-05"
185 # "2005-07-29 09:56:00-05"
186 if not s:
187 return None
188 if " " not in s:
189 return typecast_date(s)
190 d, t = s.split()
191 # Remove timezone information.
192 if "-" in t:
193 t, _ = t.split("-", 1)
194 elif "+" in t:
195 t, _ = t.split("+", 1)
196 dates = d.split("-")
197 times = t.split(":")
198 seconds = times[2]
199 if "." in seconds: # check whether seconds have a fractional part
200 seconds, microseconds = seconds.split(".")
201 else:
202 microseconds = "0"
203 return datetime.datetime(
204 int(dates[0]),
205 int(dates[1]),
206 int(dates[2]),
207 int(times[0]),
208 int(times[1]),
209 int(seconds),
210 int((microseconds + "000000")[:6]),
211 )
214###############################################
215# Converters from Python to database (string) #
216###############################################
219def split_identifier(identifier):
220 """
221 Split an SQL identifier into a two element tuple of (namespace, name).
223 The identifier could be a table, column, or sequence name might be prefixed
224 by a namespace.
225 """
226 try:
227 namespace, name = identifier.split('"."')
228 except ValueError:
229 namespace, name = "", identifier
230 return namespace.strip('"'), name.strip('"')
233def truncate_name(identifier, length=None, hash_len=4):
234 """
235 Shorten an SQL identifier to a repeatable mangled version with the given
236 length.
238 If a quote stripped name contains a namespace, e.g. USERNAME"."TABLE,
239 truncate the table portion only.
240 """
241 namespace, name = split_identifier(identifier)
243 if length is None or len(name) <= length: 243 ↛ 246line 243 didn't jump to line 246, because the condition on line 243 was never false
244 return identifier
246 digest = names_digest(name, length=hash_len)
247 return "%s%s%s" % (
248 '%s"."' % namespace if namespace else "",
249 name[: length - hash_len],
250 digest,
251 )
254def names_digest(*args, length):
255 """
256 Generate a 32-bit digest of a set of arguments that can be used to shorten
257 identifying names.
258 """
259 h = hashlib.md5()
260 for arg in args:
261 h.update(arg.encode())
262 return h.hexdigest()[:length]
265def format_number(value, max_digits, decimal_places):
266 """
267 Format a number into a string with the requisite number of digits and
268 decimal places.
269 """
270 if value is None:
271 return None
272 context = decimal.getcontext().copy()
273 if max_digits is not None:
274 context.prec = max_digits
275 if decimal_places is not None:
276 value = value.quantize(
277 decimal.Decimal(1).scaleb(-decimal_places), context=context
278 )
279 else:
280 context.traps[decimal.Rounded] = 1
281 value = context.create_decimal(value)
282 return "{:f}".format(value)
285def strip_quotes(table_name):
286 """
287 Strip quotes off of quoted table names to make them safe for use in index
288 names, sequence names, etc. For example '"USER"."TABLE"' (an Oracle naming
289 scheme) becomes 'USER"."TABLE'.
290 """
291 has_quotes = table_name.startswith('"') and table_name.endswith('"')
292 return table_name[1:-1] if has_quotes else table_name