Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/introspection.py: 51%

43 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from django.db.backends.base.introspection import ( 

2 BaseDatabaseIntrospection, 

3 FieldInfo, 

4 TableInfo, 

5) 

6from django.db.models import Index 

7 

8 

9class DatabaseIntrospection(BaseDatabaseIntrospection): 

10 # Maps type codes to Django Field types. 

11 data_types_reverse = { 

12 16: "BooleanField", 

13 17: "BinaryField", 

14 20: "BigIntegerField", 

15 21: "SmallIntegerField", 

16 23: "IntegerField", 

17 25: "TextField", 

18 700: "FloatField", 

19 701: "FloatField", 

20 869: "GenericIPAddressField", 

21 1042: "CharField", # blank-padded 

22 1043: "CharField", 

23 1082: "DateField", 

24 1083: "TimeField", 

25 1114: "DateTimeField", 

26 1184: "DateTimeField", 

27 1186: "DurationField", 

28 1266: "TimeField", 

29 1700: "DecimalField", 

30 2950: "UUIDField", 

31 3802: "JSONField", 

32 } 

33 # A hook for subclasses. 

34 index_default_access_method = "btree" 

35 

36 ignored_tables = [] 

37 

38 def get_field_type(self, data_type, description): 

39 field_type = super().get_field_type(data_type, description) 

40 if description.default and "nextval" in description.default: 

41 if field_type == "IntegerField": 

42 return "AutoField" 

43 elif field_type == "BigIntegerField": 

44 return "BigAutoField" 

45 elif field_type == "SmallIntegerField": 

46 return "SmallAutoField" 

47 return field_type 

48 

49 def get_table_list(self, cursor): 

50 """Return a list of table and view names in the current database.""" 

51 cursor.execute( 

52 """ 

53 SELECT 

54 c.relname, 

55 CASE 

56 WHEN c.relispartition THEN 'p' 

57 WHEN c.relkind IN ('m', 'v') THEN 'v' 

58 ELSE 't' 

59 END 

60 FROM pg_catalog.pg_class c 

61 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace 

62 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v') 

63 AND n.nspname NOT IN ('pg_catalog', 'pg_toast') 

64 AND pg_catalog.pg_table_is_visible(c.oid) 

65 """ 

66 ) 

67 return [ 

68 TableInfo(*row) 

69 for row in cursor.fetchall() 

70 if row[0] not in self.ignored_tables 

71 ] 

72 

73 def get_table_description(self, cursor, table_name): 

74 """ 

75 Return a description of the table with the DB-API cursor.description 

76 interface. 

77 """ 

78 # Query the pg_catalog tables as cursor.description does not reliably 

79 # return the nullable property and information_schema.columns does not 

80 # contain details of materialized views. 

81 cursor.execute( 

82 """ 

83 SELECT 

84 a.attname AS column_name, 

85 NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable, 

86 pg_get_expr(ad.adbin, ad.adrelid) AS column_default, 

87 CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation 

88 FROM pg_attribute a 

89 LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum 

90 LEFT JOIN pg_collation co ON a.attcollation = co.oid 

91 JOIN pg_type t ON a.atttypid = t.oid 

92 JOIN pg_class c ON a.attrelid = c.oid 

93 JOIN pg_namespace n ON c.relnamespace = n.oid 

94 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v') 

95 AND c.relname = %s 

96 AND n.nspname NOT IN ('pg_catalog', 'pg_toast') 

97 AND pg_catalog.pg_table_is_visible(c.oid) 

98 """, 

99 [table_name], 

100 ) 

101 field_map = {line[0]: line[1:] for line in cursor.fetchall()} 

102 cursor.execute( 

103 "SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name) 

104 ) 

105 return [ 

106 FieldInfo( 

107 line.name, 

108 line.type_code, 

109 line.display_size, 

110 line.internal_size, 

111 line.precision, 

112 line.scale, 

113 *field_map[line.name], 

114 ) 

115 for line in cursor.description 

116 ] 

117 

118 def get_sequences(self, cursor, table_name, table_fields=()): 

119 cursor.execute( 

120 """ 

121 SELECT s.relname as sequence_name, col.attname 

122 FROM pg_class s 

123 JOIN pg_namespace sn ON sn.oid = s.relnamespace 

124 JOIN 

125 pg_depend d ON d.refobjid = s.oid 

126 AND d.refclassid = 'pg_class'::regclass 

127 JOIN 

128 pg_attrdef ad ON ad.oid = d.objid 

129 AND d.classid = 'pg_attrdef'::regclass 

130 JOIN 

131 pg_attribute col ON col.attrelid = ad.adrelid 

132 AND col.attnum = ad.adnum 

133 JOIN pg_class tbl ON tbl.oid = ad.adrelid 

134 WHERE s.relkind = 'S' 

135 AND d.deptype in ('a', 'n') 

136 AND pg_catalog.pg_table_is_visible(tbl.oid) 

137 AND tbl.relname = %s 

138 """, 

139 [table_name], 

140 ) 

141 return [ 

142 {"name": row[0], "table": table_name, "column": row[1]} 

143 for row in cursor.fetchall() 

144 ] 

145 

146 def get_relations(self, cursor, table_name): 

147 """ 

148 Return a dictionary of {field_name: (field_name_other_table, other_table)} 

149 representing all relationships to the given table. 

150 """ 

151 return { 

152 row[0]: (row[2], row[1]) for row in self.get_key_columns(cursor, table_name) 

153 } 

154 

155 def get_key_columns(self, cursor, table_name): 

156 cursor.execute( 

157 """ 

158 SELECT a1.attname, c2.relname, a2.attname 

159 FROM pg_constraint con 

160 LEFT JOIN pg_class c1 ON con.conrelid = c1.oid 

161 LEFT JOIN pg_class c2 ON con.confrelid = c2.oid 

162 LEFT JOIN 

163 pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1] 

164 LEFT JOIN 

165 pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1] 

166 WHERE 

167 c1.relname = %s AND 

168 con.contype = 'f' AND 

169 c1.relnamespace = c2.relnamespace AND 

170 pg_catalog.pg_table_is_visible(c1.oid) 

171 """, 

172 [table_name], 

173 ) 

174 return cursor.fetchall() 

175 

176 def get_constraints(self, cursor, table_name): 

177 """ 

178 Retrieve any constraints or keys (unique, pk, fk, check, index) across 

179 one or more columns. Also retrieve the definition of expression-based 

180 indexes. 

181 """ 

182 constraints = {} 

183 # Loop over the key table, collecting things as constraints. The column 

184 # array must return column names in the same order in which they were 

185 # created. 

186 cursor.execute( 

187 """ 

188 SELECT 

189 c.conname, 

190 array( 

191 SELECT attname 

192 FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx) 

193 JOIN pg_attribute AS ca ON cols.colid = ca.attnum 

194 WHERE ca.attrelid = c.conrelid 

195 ORDER BY cols.arridx 

196 ), 

197 c.contype, 

198 (SELECT fkc.relname || '.' || fka.attname 

199 FROM pg_attribute AS fka 

200 JOIN pg_class AS fkc ON fka.attrelid = fkc.oid 

201 WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]), 

202 cl.reloptions 

203 FROM pg_constraint AS c 

204 JOIN pg_class AS cl ON c.conrelid = cl.oid 

205 WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid) 

206 """, 

207 [table_name], 

208 ) 

209 for constraint, columns, kind, used_cols, options in cursor.fetchall(): 

210 constraints[constraint] = { 

211 "columns": columns, 

212 "primary_key": kind == "p", 

213 "unique": kind in ["p", "u"], 

214 "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None, 

215 "check": kind == "c", 

216 "index": False, 

217 "definition": None, 

218 "options": options, 

219 } 

220 # Now get indexes 

221 cursor.execute( 

222 """ 

223 SELECT 

224 indexname, 

225 array_agg(attname ORDER BY arridx), 

226 indisunique, 

227 indisprimary, 

228 array_agg(ordering ORDER BY arridx), 

229 amname, 

230 exprdef, 

231 s2.attoptions 

232 FROM ( 

233 SELECT 

234 c2.relname as indexname, idx.*, attr.attname, am.amname, 

235 CASE 

236 WHEN idx.indexprs IS NOT NULL THEN 

237 pg_get_indexdef(idx.indexrelid) 

238 END AS exprdef, 

239 CASE am.amname 

240 WHEN %s THEN 

241 CASE (option & 1) 

242 WHEN 1 THEN 'DESC' ELSE 'ASC' 

243 END 

244 END as ordering, 

245 c2.reloptions as attoptions 

246 FROM ( 

247 SELECT * 

248 FROM 

249 pg_index i, 

250 unnest(i.indkey, i.indoption) 

251 WITH ORDINALITY koi(key, option, arridx) 

252 ) idx 

253 LEFT JOIN pg_class c ON idx.indrelid = c.oid 

254 LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid 

255 LEFT JOIN pg_am am ON c2.relam = am.oid 

256 LEFT JOIN 

257 pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key 

258 WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid) 

259 ) s2 

260 GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions; 

261 """, 

262 [self.index_default_access_method, table_name], 

263 ) 

264 for ( 

265 index, 

266 columns, 

267 unique, 

268 primary, 

269 orders, 

270 type_, 

271 definition, 

272 options, 

273 ) in cursor.fetchall(): 

274 if index not in constraints: 

275 basic_index = ( 

276 type_ == self.index_default_access_method 

277 and 

278 # '_btree' references 

279 # django.contrib.postgres.indexes.BTreeIndex.suffix. 

280 not index.endswith("_btree") 

281 and options is None 

282 ) 

283 constraints[index] = { 

284 "columns": columns if columns != [None] else [], 

285 "orders": orders if orders != [None] else [], 

286 "primary_key": primary, 

287 "unique": unique, 

288 "foreign_key": None, 

289 "check": False, 

290 "index": True, 

291 "type": Index.suffix if basic_index else type_, 

292 "definition": definition, 

293 "options": options, 

294 } 

295 return constraints