Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/backends/postgresql/introspection.py: 51%
43 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from django.db.backends.base.introspection import (
2 BaseDatabaseIntrospection,
3 FieldInfo,
4 TableInfo,
5)
6from django.db.models import Index
9class DatabaseIntrospection(BaseDatabaseIntrospection):
10 # Maps type codes to Django Field types.
11 data_types_reverse = {
12 16: "BooleanField",
13 17: "BinaryField",
14 20: "BigIntegerField",
15 21: "SmallIntegerField",
16 23: "IntegerField",
17 25: "TextField",
18 700: "FloatField",
19 701: "FloatField",
20 869: "GenericIPAddressField",
21 1042: "CharField", # blank-padded
22 1043: "CharField",
23 1082: "DateField",
24 1083: "TimeField",
25 1114: "DateTimeField",
26 1184: "DateTimeField",
27 1186: "DurationField",
28 1266: "TimeField",
29 1700: "DecimalField",
30 2950: "UUIDField",
31 3802: "JSONField",
32 }
33 # A hook for subclasses.
34 index_default_access_method = "btree"
36 ignored_tables = []
38 def get_field_type(self, data_type, description):
39 field_type = super().get_field_type(data_type, description)
40 if description.default and "nextval" in description.default:
41 if field_type == "IntegerField":
42 return "AutoField"
43 elif field_type == "BigIntegerField":
44 return "BigAutoField"
45 elif field_type == "SmallIntegerField":
46 return "SmallAutoField"
47 return field_type
49 def get_table_list(self, cursor):
50 """Return a list of table and view names in the current database."""
51 cursor.execute(
52 """
53 SELECT
54 c.relname,
55 CASE
56 WHEN c.relispartition THEN 'p'
57 WHEN c.relkind IN ('m', 'v') THEN 'v'
58 ELSE 't'
59 END
60 FROM pg_catalog.pg_class c
61 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
62 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
63 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
64 AND pg_catalog.pg_table_is_visible(c.oid)
65 """
66 )
67 return [
68 TableInfo(*row)
69 for row in cursor.fetchall()
70 if row[0] not in self.ignored_tables
71 ]
73 def get_table_description(self, cursor, table_name):
74 """
75 Return a description of the table with the DB-API cursor.description
76 interface.
77 """
78 # Query the pg_catalog tables as cursor.description does not reliably
79 # return the nullable property and information_schema.columns does not
80 # contain details of materialized views.
81 cursor.execute(
82 """
83 SELECT
84 a.attname AS column_name,
85 NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
86 pg_get_expr(ad.adbin, ad.adrelid) AS column_default,
87 CASE WHEN collname = 'default' THEN NULL ELSE collname END AS collation
88 FROM pg_attribute a
89 LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
90 LEFT JOIN pg_collation co ON a.attcollation = co.oid
91 JOIN pg_type t ON a.atttypid = t.oid
92 JOIN pg_class c ON a.attrelid = c.oid
93 JOIN pg_namespace n ON c.relnamespace = n.oid
94 WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
95 AND c.relname = %s
96 AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
97 AND pg_catalog.pg_table_is_visible(c.oid)
98 """,
99 [table_name],
100 )
101 field_map = {line[0]: line[1:] for line in cursor.fetchall()}
102 cursor.execute(
103 "SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name)
104 )
105 return [
106 FieldInfo(
107 line.name,
108 line.type_code,
109 line.display_size,
110 line.internal_size,
111 line.precision,
112 line.scale,
113 *field_map[line.name],
114 )
115 for line in cursor.description
116 ]
118 def get_sequences(self, cursor, table_name, table_fields=()):
119 cursor.execute(
120 """
121 SELECT s.relname as sequence_name, col.attname
122 FROM pg_class s
123 JOIN pg_namespace sn ON sn.oid = s.relnamespace
124 JOIN
125 pg_depend d ON d.refobjid = s.oid
126 AND d.refclassid = 'pg_class'::regclass
127 JOIN
128 pg_attrdef ad ON ad.oid = d.objid
129 AND d.classid = 'pg_attrdef'::regclass
130 JOIN
131 pg_attribute col ON col.attrelid = ad.adrelid
132 AND col.attnum = ad.adnum
133 JOIN pg_class tbl ON tbl.oid = ad.adrelid
134 WHERE s.relkind = 'S'
135 AND d.deptype in ('a', 'n')
136 AND pg_catalog.pg_table_is_visible(tbl.oid)
137 AND tbl.relname = %s
138 """,
139 [table_name],
140 )
141 return [
142 {"name": row[0], "table": table_name, "column": row[1]}
143 for row in cursor.fetchall()
144 ]
146 def get_relations(self, cursor, table_name):
147 """
148 Return a dictionary of {field_name: (field_name_other_table, other_table)}
149 representing all relationships to the given table.
150 """
151 return {
152 row[0]: (row[2], row[1]) for row in self.get_key_columns(cursor, table_name)
153 }
155 def get_key_columns(self, cursor, table_name):
156 cursor.execute(
157 """
158 SELECT a1.attname, c2.relname, a2.attname
159 FROM pg_constraint con
160 LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
161 LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
162 LEFT JOIN
163 pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
164 LEFT JOIN
165 pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
166 WHERE
167 c1.relname = %s AND
168 con.contype = 'f' AND
169 c1.relnamespace = c2.relnamespace AND
170 pg_catalog.pg_table_is_visible(c1.oid)
171 """,
172 [table_name],
173 )
174 return cursor.fetchall()
176 def get_constraints(self, cursor, table_name):
177 """
178 Retrieve any constraints or keys (unique, pk, fk, check, index) across
179 one or more columns. Also retrieve the definition of expression-based
180 indexes.
181 """
182 constraints = {}
183 # Loop over the key table, collecting things as constraints. The column
184 # array must return column names in the same order in which they were
185 # created.
186 cursor.execute(
187 """
188 SELECT
189 c.conname,
190 array(
191 SELECT attname
192 FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
193 JOIN pg_attribute AS ca ON cols.colid = ca.attnum
194 WHERE ca.attrelid = c.conrelid
195 ORDER BY cols.arridx
196 ),
197 c.contype,
198 (SELECT fkc.relname || '.' || fka.attname
199 FROM pg_attribute AS fka
200 JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
201 WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
202 cl.reloptions
203 FROM pg_constraint AS c
204 JOIN pg_class AS cl ON c.conrelid = cl.oid
205 WHERE cl.relname = %s AND pg_catalog.pg_table_is_visible(cl.oid)
206 """,
207 [table_name],
208 )
209 for constraint, columns, kind, used_cols, options in cursor.fetchall():
210 constraints[constraint] = {
211 "columns": columns,
212 "primary_key": kind == "p",
213 "unique": kind in ["p", "u"],
214 "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
215 "check": kind == "c",
216 "index": False,
217 "definition": None,
218 "options": options,
219 }
220 # Now get indexes
221 cursor.execute(
222 """
223 SELECT
224 indexname,
225 array_agg(attname ORDER BY arridx),
226 indisunique,
227 indisprimary,
228 array_agg(ordering ORDER BY arridx),
229 amname,
230 exprdef,
231 s2.attoptions
232 FROM (
233 SELECT
234 c2.relname as indexname, idx.*, attr.attname, am.amname,
235 CASE
236 WHEN idx.indexprs IS NOT NULL THEN
237 pg_get_indexdef(idx.indexrelid)
238 END AS exprdef,
239 CASE am.amname
240 WHEN %s THEN
241 CASE (option & 1)
242 WHEN 1 THEN 'DESC' ELSE 'ASC'
243 END
244 END as ordering,
245 c2.reloptions as attoptions
246 FROM (
247 SELECT *
248 FROM
249 pg_index i,
250 unnest(i.indkey, i.indoption)
251 WITH ORDINALITY koi(key, option, arridx)
252 ) idx
253 LEFT JOIN pg_class c ON idx.indrelid = c.oid
254 LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
255 LEFT JOIN pg_am am ON c2.relam = am.oid
256 LEFT JOIN
257 pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
258 WHERE c.relname = %s AND pg_catalog.pg_table_is_visible(c.oid)
259 ) s2
260 GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
261 """,
262 [self.index_default_access_method, table_name],
263 )
264 for (
265 index,
266 columns,
267 unique,
268 primary,
269 orders,
270 type_,
271 definition,
272 options,
273 ) in cursor.fetchall():
274 if index not in constraints:
275 basic_index = (
276 type_ == self.index_default_access_method
277 and
278 # '_btree' references
279 # django.contrib.postgres.indexes.BTreeIndex.suffix.
280 not index.endswith("_btree")
281 and options is None
282 )
283 constraints[index] = {
284 "columns": columns if columns != [None] else [],
285 "orders": orders if orders != [None] else [],
286 "primary_key": primary,
287 "unique": unique,
288 "foreign_key": None,
289 "check": False,
290 "index": True,
291 "type": Index.suffix if basic_index else type_,
292 "definition": definition,
293 "options": options,
294 }
295 return constraints