Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/db/models/sql/subqueries.py: 58%
88 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Query subclasses which provide extra functionality beyond simple data retrieval.
3"""
5from django.core.exceptions import FieldError
6from django.db.models.sql.constants import CURSOR, GET_ITERATOR_CHUNK_SIZE, NO_RESULTS
7from django.db.models.sql.query import Query
9__all__ = ["DeleteQuery", "UpdateQuery", "InsertQuery", "AggregateQuery"]
12class DeleteQuery(Query):
13 """A DELETE SQL query."""
15 compiler = "SQLDeleteCompiler"
17 def do_query(self, table, where, using):
18 self.alias_map = {table: self.alias_map[table]}
19 self.where = where
20 cursor = self.get_compiler(using).execute_sql(CURSOR)
21 if cursor: 21 ↛ 24line 21 didn't jump to line 24, because the condition on line 21 was never false
22 with cursor:
23 return cursor.rowcount
24 return 0
26 def delete_batch(self, pk_list, using):
27 """
28 Set up and execute delete queries for all the objects in pk_list.
30 More than one physical query may be executed if there are a
31 lot of values in pk_list.
32 """
33 # number of objects deleted
34 num_deleted = 0
35 field = self.get_meta().pk
36 for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
37 self.clear_where()
38 self.add_filter(
39 f"{field.attname}__in",
40 pk_list[offset : offset + GET_ITERATOR_CHUNK_SIZE],
41 )
42 num_deleted += self.do_query(
43 self.get_meta().db_table, self.where, using=using
44 )
45 return num_deleted
48class UpdateQuery(Query):
49 """An UPDATE SQL query."""
51 compiler = "SQLUpdateCompiler"
53 def __init__(self, *args, **kwargs):
54 super().__init__(*args, **kwargs)
55 self._setup_query()
57 def _setup_query(self):
58 """
59 Run on initialization and at the end of chaining. Any attributes that
60 would normally be set in __init__() should go here instead.
61 """
62 self.values = []
63 self.related_ids = None
64 self.related_updates = {}
66 def clone(self):
67 obj = super().clone()
68 obj.related_updates = self.related_updates.copy()
69 return obj
71 def update_batch(self, pk_list, values, using):
72 self.add_update_values(values)
73 for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE):
74 self.clear_where()
75 self.add_filter(
76 "pk__in", pk_list[offset : offset + GET_ITERATOR_CHUNK_SIZE]
77 )
78 self.get_compiler(using).execute_sql(NO_RESULTS)
80 def add_update_values(self, values):
81 """
82 Convert a dictionary of field name to value mappings into an update
83 query. This is the entry point for the public update() method on
84 querysets.
85 """
86 values_seq = []
87 for name, val in values.items():
88 field = self.get_meta().get_field(name)
89 direct = (
90 not (field.auto_created and not field.concrete) or not field.concrete
91 )
92 model = field.model._meta.concrete_model
93 if not direct or (field.is_relation and field.many_to_many):
94 raise FieldError(
95 "Cannot update model field %r (only non-relations and "
96 "foreign keys permitted)." % field
97 )
98 if model is not self.get_meta().concrete_model:
99 self.add_related_update(model, field, val)
100 continue
101 values_seq.append((field, model, val))
102 return self.add_update_fields(values_seq)
104 def add_update_fields(self, values_seq):
105 """
106 Append a sequence of (field, model, value) triples to the internal list
107 that will be used to generate the UPDATE query. Might be more usefully
108 called add_update_targets() to hint at the extra information here.
109 """
110 for field, model, val in values_seq:
111 if hasattr(val, "resolve_expression"): 111 ↛ 113line 111 didn't jump to line 113, because the condition on line 111 was never true
112 # Resolve expressions here so that annotations are no longer needed
113 val = val.resolve_expression(self, allow_joins=False, for_save=True)
114 self.values.append((field, model, val))
116 def add_related_update(self, model, field, value):
117 """
118 Add (name, value) to an update query for an ancestor model.
120 Update are coalesced so that only one update query per ancestor is run.
121 """
122 self.related_updates.setdefault(model, []).append((field, None, value))
124 def get_related_updates(self):
125 """
126 Return a list of query objects: one for each update required to an
127 ancestor model. Each query will have the same filtering conditions as
128 the current query but will only update a single table.
129 """
130 if not self.related_updates: 130 ↛ 132line 130 didn't jump to line 132, because the condition on line 130 was never false
131 return []
132 result = []
133 for model, values in self.related_updates.items():
134 query = UpdateQuery(model)
135 query.values = values
136 if self.related_ids is not None:
137 query.add_filter("pk__in", self.related_ids)
138 result.append(query)
139 return result
142class InsertQuery(Query):
143 compiler = "SQLInsertCompiler"
145 def __init__(self, *args, ignore_conflicts=False, **kwargs):
146 super().__init__(*args, **kwargs)
147 self.fields = []
148 self.objs = []
149 self.ignore_conflicts = ignore_conflicts
151 def insert_values(self, fields, objs, raw=False):
152 self.fields = fields
153 self.objs = objs
154 self.raw = raw
157class AggregateQuery(Query):
158 """
159 Take another query as a parameter to the FROM clause and only select the
160 elements in the provided list.
161 """
163 compiler = "SQLAggregateCompiler"
165 def __init__(self, model, inner_query):
166 self.inner_query = inner_query
167 super().__init__(model)