Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/utils.py: 45%

147 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import datetime 

2import decimal 

3import functools 

4import hashlib 

5import logging 

6import time 

7from contextlib import contextmanager 

8 

9from django.db import NotSupportedError 

10from django.utils.dateparse import parse_time 

11 

12logger = logging.getLogger("django.db.backends") 

13 

14 

15class CursorWrapper: 

16 def __init__(self, cursor, db): 

17 self.cursor = cursor 

18 self.db = db 

19 

20 WRAP_ERROR_ATTRS = frozenset(["fetchone", "fetchmany", "fetchall", "nextset"]) 

21 

22 def __getattr__(self, attr): 

23 cursor_attr = getattr(self.cursor, attr) 

24 if attr in CursorWrapper.WRAP_ERROR_ATTRS: 

25 return self.db.wrap_database_errors(cursor_attr) 

26 else: 

27 return cursor_attr 

28 

29 def __iter__(self): 

30 with self.db.wrap_database_errors: 

31 yield from self.cursor 

32 

33 def __enter__(self): 

34 return self 

35 

36 def __exit__(self, type, value, traceback): 

37 # Close instead of passing through to avoid backend-specific behavior 

38 # (#17671). Catch errors liberally because errors in cleanup code 

39 # aren't useful. 

40 try: 

41 self.close() 

42 except self.db.Database.Error: 

43 pass 

44 

45 # The following methods cannot be implemented in __getattr__, because the 

46 # code must run when the method is invoked, not just when it is accessed. 

47 

48 def callproc(self, procname, params=None, kparams=None): 

49 # Keyword parameters for callproc aren't supported in PEP 249, but the 

50 # database driver may support them (e.g. cx_Oracle). 

51 if kparams is not None and not self.db.features.supports_callproc_kwargs: 

52 raise NotSupportedError( 

53 "Keyword parameters for callproc are not supported on this " 

54 "database backend." 

55 ) 

56 self.db.validate_no_broken_transaction() 

57 with self.db.wrap_database_errors: 

58 if params is None and kparams is None: 

59 return self.cursor.callproc(procname) 

60 elif kparams is None: 

61 return self.cursor.callproc(procname, params) 

62 else: 

63 params = params or () 

64 return self.cursor.callproc(procname, params, kparams) 

65 

66 def execute(self, sql, params=None): 

67 return self._execute_with_wrappers( 

68 sql, params, many=False, executor=self._execute 

69 ) 

70 

71 def executemany(self, sql, param_list): 

72 return self._execute_with_wrappers( 

73 sql, param_list, many=True, executor=self._executemany 

74 ) 

75 

76 def _execute_with_wrappers(self, sql, params, many, executor): 

77 context = {"connection": self.db, "cursor": self} 

78 for wrapper in reversed(self.db.execute_wrappers): 78 ↛ 79line 78 didn't jump to line 79, because the loop on line 78 never started

79 executor = functools.partial(wrapper, executor) 

80 return executor(sql, params, many, context) 

81 

82 def _execute(self, sql, params, *ignored_wrapper_args): 

83 self.db.validate_no_broken_transaction() 

84 with self.db.wrap_database_errors: 

85 if params is None: 

86 # params default might be backend specific. 

87 return self.cursor.execute(sql) 

88 else: 

89 return self.cursor.execute(sql, params) 

90 

91 def _executemany(self, sql, param_list, *ignored_wrapper_args): 

92 self.db.validate_no_broken_transaction() 

93 with self.db.wrap_database_errors: 

94 return self.cursor.executemany(sql, param_list) 

95 

96 

97class CursorDebugWrapper(CursorWrapper): 

98 

99 # XXX callproc isn't instrumented at this time. 

100 

101 def execute(self, sql, params=None): 

102 with self.debug_sql(sql, params, use_last_executed_query=True): 

103 return super().execute(sql, params) 

104 

105 def executemany(self, sql, param_list): 

106 with self.debug_sql(sql, param_list, many=True): 

107 return super().executemany(sql, param_list) 

108 

109 @contextmanager 

110 def debug_sql( 

111 self, sql=None, params=None, use_last_executed_query=False, many=False 

112 ): 

113 start = time.monotonic() 

114 try: 

115 yield 

116 finally: 

117 stop = time.monotonic() 

118 duration = stop - start 

119 if use_last_executed_query: 

120 sql = self.db.ops.last_executed_query(self.cursor, sql, params) 

121 try: 

122 times = len(params) if many else "" 

123 except TypeError: 

124 # params could be an iterator. 

125 times = "?" 

126 self.db.queries_log.append( 

127 { 

128 "sql": "%s times: %s" % (times, sql) if many else sql, 

129 "time": "%.3f" % duration, 

130 } 

131 ) 

132 logger.debug( 

133 "(%.3f) %s; args=%s; alias=%s", 

134 duration, 

135 sql, 

136 params, 

137 self.db.alias, 

138 extra={ 

139 "duration": duration, 

140 "sql": sql, 

141 "params": params, 

142 "alias": self.db.alias, 

143 }, 

144 ) 

145 

146 

147def split_tzname_delta(tzname): 

148 """ 

149 Split a time zone name into a 3-tuple of (name, sign, offset). 

150 """ 

151 for sign in ["+", "-"]: 

152 if sign in tzname: 152 ↛ 153line 152 didn't jump to line 153, because the condition on line 152 was never true

153 name, offset = tzname.rsplit(sign, 1) 

154 if offset and parse_time(offset): 

155 return name, sign, offset 

156 return tzname, None, None 

157 

158 

159############################################### 

160# Converters from database (string) to Python # 

161############################################### 

162 

163 

164def typecast_date(s): 

165 return ( 

166 datetime.date(*map(int, s.split("-"))) if s else None 

167 ) # return None if s is null 

168 

169 

170def typecast_time(s): # does NOT store time zone information 

171 if not s: 

172 return None 

173 hour, minutes, seconds = s.split(":") 

174 if "." in seconds: # check whether seconds have a fractional part 

175 seconds, microseconds = seconds.split(".") 

176 else: 

177 microseconds = "0" 

178 return datetime.time( 

179 int(hour), int(minutes), int(seconds), int((microseconds + "000000")[:6]) 

180 ) 

181 

182 

183def typecast_timestamp(s): # does NOT store time zone information 

184 # "2005-07-29 15:48:00.590358-05" 

185 # "2005-07-29 09:56:00-05" 

186 if not s: 

187 return None 

188 if " " not in s: 

189 return typecast_date(s) 

190 d, t = s.split() 

191 # Remove timezone information. 

192 if "-" in t: 

193 t, _ = t.split("-", 1) 

194 elif "+" in t: 

195 t, _ = t.split("+", 1) 

196 dates = d.split("-") 

197 times = t.split(":") 

198 seconds = times[2] 

199 if "." in seconds: # check whether seconds have a fractional part 

200 seconds, microseconds = seconds.split(".") 

201 else: 

202 microseconds = "0" 

203 return datetime.datetime( 

204 int(dates[0]), 

205 int(dates[1]), 

206 int(dates[2]), 

207 int(times[0]), 

208 int(times[1]), 

209 int(seconds), 

210 int((microseconds + "000000")[:6]), 

211 ) 

212 

213 

214############################################### 

215# Converters from Python to database (string) # 

216############################################### 

217 

218 

219def split_identifier(identifier): 

220 """ 

221 Split an SQL identifier into a two element tuple of (namespace, name). 

222 

223 The identifier could be a table, column, or sequence name might be prefixed 

224 by a namespace. 

225 """ 

226 try: 

227 namespace, name = identifier.split('"."') 

228 except ValueError: 

229 namespace, name = "", identifier 

230 return namespace.strip('"'), name.strip('"') 

231 

232 

233def truncate_name(identifier, length=None, hash_len=4): 

234 """ 

235 Shorten an SQL identifier to a repeatable mangled version with the given 

236 length. 

237 

238 If a quote stripped name contains a namespace, e.g. USERNAME"."TABLE, 

239 truncate the table portion only. 

240 """ 

241 namespace, name = split_identifier(identifier) 

242 

243 if length is None or len(name) <= length: 243 ↛ 246line 243 didn't jump to line 246, because the condition on line 243 was never false

244 return identifier 

245 

246 digest = names_digest(name, length=hash_len) 

247 return "%s%s%s" % ( 

248 '%s"."' % namespace if namespace else "", 

249 name[: length - hash_len], 

250 digest, 

251 ) 

252 

253 

254def names_digest(*args, length): 

255 """ 

256 Generate a 32-bit digest of a set of arguments that can be used to shorten 

257 identifying names. 

258 """ 

259 h = hashlib.md5() 

260 for arg in args: 

261 h.update(arg.encode()) 

262 return h.hexdigest()[:length] 

263 

264 

265def format_number(value, max_digits, decimal_places): 

266 """ 

267 Format a number into a string with the requisite number of digits and 

268 decimal places. 

269 """ 

270 if value is None: 

271 return None 

272 context = decimal.getcontext().copy() 

273 if max_digits is not None: 

274 context.prec = max_digits 

275 if decimal_places is not None: 

276 value = value.quantize( 

277 decimal.Decimal(1).scaleb(-decimal_places), context=context 

278 ) 

279 else: 

280 context.traps[decimal.Rounded] = 1 

281 value = context.create_decimal(value) 

282 return "{:f}".format(value) 

283 

284 

285def strip_quotes(table_name): 

286 """ 

287 Strip quotes off of quoted table names to make them safe for use in index 

288 names, sequence names, etc. For example '"USER"."TABLE"' (an Oracle naming 

289 scheme) becomes 'USER"."TABLE'. 

290 """ 

291 has_quotes = table_name.startswith('"') and table_name.endswith('"') 

292 return table_name[1:-1] if has_quotes else table_name