Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/serializers/base.py: 25%
187 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Module for abstract serializer/unserializer base classes.
3"""
4import pickle
5from io import StringIO
7from django.core.exceptions import ObjectDoesNotExist
8from django.db import models
10DEFER_FIELD = object()
13class PickleSerializer:
14 """
15 Simple wrapper around pickle to be used in signing.dumps()/loads() and
16 cache backends.
17 """
19 def __init__(self, protocol=None):
20 self.protocol = pickle.HIGHEST_PROTOCOL if protocol is None else protocol
22 def dumps(self, obj):
23 return pickle.dumps(obj, self.protocol)
25 def loads(self, data):
26 return pickle.loads(data)
29class SerializerDoesNotExist(KeyError):
30 """The requested serializer was not found."""
32 pass
35class SerializationError(Exception):
36 """Something bad happened during serialization."""
38 pass
41class DeserializationError(Exception):
42 """Something bad happened during deserialization."""
44 @classmethod
45 def WithData(cls, original_exc, model, fk, field_value):
46 """
47 Factory method for creating a deserialization error which has a more
48 explanatory message.
49 """
50 return cls(
51 "%s: (%s:pk=%s) field_value was '%s'"
52 % (original_exc, model, fk, field_value)
53 )
56class M2MDeserializationError(Exception):
57 """Something bad happened during deserialization of a ManyToManyField."""
59 def __init__(self, original_exc, pk):
60 self.original_exc = original_exc
61 self.pk = pk
64class ProgressBar:
65 progress_width = 75
67 def __init__(self, output, total_count):
68 self.output = output
69 self.total_count = total_count
70 self.prev_done = 0
72 def update(self, count):
73 if not self.output:
74 return
75 perc = count * 100 // self.total_count
76 done = perc * self.progress_width // 100
77 if self.prev_done >= done:
78 return
79 self.prev_done = done
80 cr = "" if self.total_count == 1 else "\r"
81 self.output.write(
82 cr + "[" + "." * done + " " * (self.progress_width - done) + "]"
83 )
84 if done == self.progress_width:
85 self.output.write("\n")
86 self.output.flush()
89class Serializer:
90 """
91 Abstract serializer base class.
92 """
94 # Indicates if the implemented serializer is only available for
95 # internal Django use.
96 internal_use_only = False
97 progress_class = ProgressBar
98 stream_class = StringIO
100 def serialize(
101 self,
102 queryset,
103 *,
104 stream=None,
105 fields=None,
106 use_natural_foreign_keys=False,
107 use_natural_primary_keys=False,
108 progress_output=None,
109 object_count=0,
110 **options,
111 ):
112 """
113 Serialize a queryset.
114 """
115 self.options = options
117 self.stream = stream if stream is not None else self.stream_class()
118 self.selected_fields = fields
119 self.use_natural_foreign_keys = use_natural_foreign_keys
120 self.use_natural_primary_keys = use_natural_primary_keys
121 progress_bar = self.progress_class(progress_output, object_count)
123 self.start_serialization()
124 self.first = True
125 for count, obj in enumerate(queryset, start=1):
126 self.start_object(obj)
127 # Use the concrete parent class' _meta instead of the object's _meta
128 # This is to avoid local_fields problems for proxy models. Refs #17717.
129 concrete_model = obj._meta.concrete_model
130 # When using natural primary keys, retrieve the pk field of the
131 # parent for multi-table inheritance child models. That field must
132 # be serialized, otherwise deserialization isn't possible.
133 if self.use_natural_primary_keys:
134 pk = concrete_model._meta.pk
135 pk_parent = (
136 pk if pk.remote_field and pk.remote_field.parent_link else None
137 )
138 else:
139 pk_parent = None
140 for field in concrete_model._meta.local_fields:
141 if field.serialize or field is pk_parent:
142 if field.remote_field is None:
143 if (
144 self.selected_fields is None
145 or field.attname in self.selected_fields
146 ):
147 self.handle_field(obj, field)
148 else:
149 if (
150 self.selected_fields is None
151 or field.attname[:-3] in self.selected_fields
152 ):
153 self.handle_fk_field(obj, field)
154 for field in concrete_model._meta.local_many_to_many:
155 if field.serialize:
156 if (
157 self.selected_fields is None
158 or field.attname in self.selected_fields
159 ):
160 self.handle_m2m_field(obj, field)
161 self.end_object(obj)
162 progress_bar.update(count)
163 self.first = self.first and False
164 self.end_serialization()
165 return self.getvalue()
167 def start_serialization(self):
168 """
169 Called when serializing of the queryset starts.
170 """
171 raise NotImplementedError(
172 "subclasses of Serializer must provide a start_serialization() method"
173 )
175 def end_serialization(self):
176 """
177 Called when serializing of the queryset ends.
178 """
179 pass
181 def start_object(self, obj):
182 """
183 Called when serializing of an object starts.
184 """
185 raise NotImplementedError(
186 "subclasses of Serializer must provide a start_object() method"
187 )
189 def end_object(self, obj):
190 """
191 Called when serializing of an object ends.
192 """
193 pass
195 def handle_field(self, obj, field):
196 """
197 Called to handle each individual (non-relational) field on an object.
198 """
199 raise NotImplementedError(
200 "subclasses of Serializer must provide a handle_field() method"
201 )
203 def handle_fk_field(self, obj, field):
204 """
205 Called to handle a ForeignKey field.
206 """
207 raise NotImplementedError(
208 "subclasses of Serializer must provide a handle_fk_field() method"
209 )
211 def handle_m2m_field(self, obj, field):
212 """
213 Called to handle a ManyToManyField.
214 """
215 raise NotImplementedError(
216 "subclasses of Serializer must provide a handle_m2m_field() method"
217 )
219 def getvalue(self):
220 """
221 Return the fully serialized queryset (or None if the output stream is
222 not seekable).
223 """
224 if callable(getattr(self.stream, "getvalue", None)):
225 return self.stream.getvalue()
228class Deserializer:
229 """
230 Abstract base deserializer class.
231 """
233 def __init__(self, stream_or_string, **options):
234 """
235 Init this serializer given a stream or a string
236 """
237 self.options = options
238 if isinstance(stream_or_string, str):
239 self.stream = StringIO(stream_or_string)
240 else:
241 self.stream = stream_or_string
243 def __iter__(self):
244 return self
246 def __next__(self):
247 """Iteration interface -- return the next item in the stream"""
248 raise NotImplementedError(
249 "subclasses of Deserializer must provide a __next__() method"
250 )
253class DeserializedObject:
254 """
255 A deserialized model.
257 Basically a container for holding the pre-saved deserialized data along
258 with the many-to-many data saved with the object.
260 Call ``save()`` to save the object (with the many-to-many data) to the
261 database; call ``save(save_m2m=False)`` to save just the object fields
262 (and not touch the many-to-many stuff.)
263 """
265 def __init__(self, obj, m2m_data=None, deferred_fields=None):
266 self.object = obj
267 self.m2m_data = m2m_data
268 self.deferred_fields = deferred_fields
270 def __repr__(self):
271 return "<%s: %s(pk=%s)>" % (
272 self.__class__.__name__,
273 self.object._meta.label,
274 self.object.pk,
275 )
277 def save(self, save_m2m=True, using=None, **kwargs):
278 # Call save on the Model baseclass directly. This bypasses any
279 # model-defined save. The save is also forced to be raw.
280 # raw=True is passed to any pre/post_save signals.
281 models.Model.save_base(self.object, using=using, raw=True, **kwargs)
282 if self.m2m_data and save_m2m:
283 for accessor_name, object_list in self.m2m_data.items():
284 getattr(self.object, accessor_name).set(object_list)
286 # prevent a second (possibly accidental) call to save() from saving
287 # the m2m data twice.
288 self.m2m_data = None
290 def save_deferred_fields(self, using=None):
291 self.m2m_data = {}
292 for field, field_value in self.deferred_fields.items():
293 opts = self.object._meta
294 label = opts.app_label + "." + opts.model_name
295 if isinstance(field.remote_field, models.ManyToManyRel):
296 try:
297 values = deserialize_m2m_values(
298 field, field_value, using, handle_forward_references=False
299 )
300 except M2MDeserializationError as e:
301 raise DeserializationError.WithData(
302 e.original_exc, label, self.object.pk, e.pk
303 )
304 self.m2m_data[field.name] = values
305 elif isinstance(field.remote_field, models.ManyToOneRel):
306 try:
307 value = deserialize_fk_value(
308 field, field_value, using, handle_forward_references=False
309 )
310 except Exception as e:
311 raise DeserializationError.WithData(
312 e, label, self.object.pk, field_value
313 )
314 setattr(self.object, field.attname, value)
315 self.save()
318def build_instance(Model, data, db):
319 """
320 Build a model instance.
322 If the model instance doesn't have a primary key and the model supports
323 natural keys, try to retrieve it from the database.
324 """
325 default_manager = Model._meta.default_manager
326 pk = data.get(Model._meta.pk.attname)
327 if (
328 pk is None
329 and hasattr(default_manager, "get_by_natural_key")
330 and hasattr(Model, "natural_key")
331 ):
332 natural_key = Model(**data).natural_key()
333 try:
334 data[Model._meta.pk.attname] = Model._meta.pk.to_python(
335 default_manager.db_manager(db).get_by_natural_key(*natural_key).pk
336 )
337 except Model.DoesNotExist:
338 pass
339 return Model(**data)
342def deserialize_m2m_values(field, field_value, using, handle_forward_references):
343 model = field.remote_field.model
344 if hasattr(model._default_manager, "get_by_natural_key"):
346 def m2m_convert(value):
347 if hasattr(value, "__iter__") and not isinstance(value, str):
348 return (
349 model._default_manager.db_manager(using)
350 .get_by_natural_key(*value)
351 .pk
352 )
353 else:
354 return model._meta.pk.to_python(value)
356 else:
358 def m2m_convert(v):
359 return model._meta.pk.to_python(v)
361 try:
362 pks_iter = iter(field_value)
363 except TypeError as e:
364 raise M2MDeserializationError(e, field_value)
365 try:
366 values = []
367 for pk in pks_iter:
368 values.append(m2m_convert(pk))
369 return values
370 except Exception as e:
371 if isinstance(e, ObjectDoesNotExist) and handle_forward_references:
372 return DEFER_FIELD
373 else:
374 raise M2MDeserializationError(e, pk)
377def deserialize_fk_value(field, field_value, using, handle_forward_references):
378 if field_value is None:
379 return None
380 model = field.remote_field.model
381 default_manager = model._default_manager
382 field_name = field.remote_field.field_name
383 if (
384 hasattr(default_manager, "get_by_natural_key")
385 and hasattr(field_value, "__iter__")
386 and not isinstance(field_value, str)
387 ):
388 try:
389 obj = default_manager.db_manager(using).get_by_natural_key(*field_value)
390 except ObjectDoesNotExist:
391 if handle_forward_references:
392 return DEFER_FIELD
393 else:
394 raise
395 value = getattr(obj, field_name)
396 # If this is a natural foreign key to an object that has a FK/O2O as
397 # the foreign key, use the FK value.
398 if model._meta.pk.remote_field:
399 value = value.pk
400 return value
401 return model._meta.get_field(field_name).to_python(field_value)