Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/gitdb/stream.py: 27%
291 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
2#
3# This module is part of GitDB and is released under
4# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
6from io import BytesIO
8import mmap
9import os
10import sys
11import zlib
13from gitdb.fun import (
14 msb_size,
15 stream_copy,
16 apply_delta_data,
17 connect_deltas,
18 delta_types
19)
21from gitdb.util import (
22 allocate_memory,
23 LazyMixin,
24 make_sha,
25 write,
26 close,
27)
29from gitdb.const import NULL_BYTE, BYTE_SPACE
30from gitdb.utils.encoding import force_bytes
32has_perf_mod = False
33try:
34 from gitdb_speedups._perf import apply_delta as c_apply_delta
35 has_perf_mod = True
36except ImportError:
37 pass
39__all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader',
40 'Sha1Writer', 'FlexibleSha1Writer', 'ZippedStoreShaWriter', 'FDCompressedSha1Writer',
41 'FDStream', 'NullStream')
44#{ RO Streams
46class DecompressMemMapReader(LazyMixin):
48 """Reads data in chunks from a memory map and decompresses it. The client sees
49 only the uncompressed data, respective file-like read calls are handling on-demand
50 buffered decompression accordingly
52 A constraint on the total size of bytes is activated, simulating
53 a logical file within a possibly larger physical memory area
55 To read efficiently, you clearly don't want to read individual bytes, instead,
56 read a few kilobytes at least.
58 **Note:** The chunk-size should be carefully selected as it will involve quite a bit
59 of string copying due to the way the zlib is implemented. Its very wasteful,
60 hence we try to find a good tradeoff between allocation time and number of
61 times we actually allocate. An own zlib implementation would be good here
62 to better support streamed reading - it would only need to keep the mmap
63 and decompress it into chunks, that's all ... """
64 __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close',
65 '_cbr', '_phi')
67 max_read_size = 512 * 1024 # currently unused
69 def __init__(self, m, close_on_deletion, size=None):
70 """Initialize with mmap for stream reading
71 :param m: must be content data - use new if you have object data and no size"""
72 self._m = m
73 self._zip = zlib.decompressobj()
74 self._buf = None # buffer of decompressed bytes
75 self._buflen = 0 # length of bytes in buffer
76 if size is not None:
77 self._s = size # size of uncompressed data to read in total
78 self._br = 0 # num uncompressed bytes read
79 self._cws = 0 # start byte of compression window
80 self._cwe = 0 # end byte of compression window
81 self._cbr = 0 # number of compressed bytes read
82 self._phi = False # is True if we parsed the header info
83 self._close = close_on_deletion # close the memmap on deletion ?
85 def _set_cache_(self, attr):
86 assert attr == '_s'
87 # only happens for size, which is a marker to indicate we still
88 # have to parse the header from the stream
89 self._parse_header_info()
91 def __del__(self):
92 self.close()
94 def _parse_header_info(self):
95 """If this stream contains object data, parse the header info and skip the
96 stream to a point where each read will yield object content
98 :return: parsed type_string, size"""
99 # read header
100 # should really be enough, cgit uses 8192 I believe
101 # And for good reason !! This needs to be that high for the header to be read correctly in all cases
102 maxb = 8192
103 self._s = maxb
104 hdr = self.read(maxb)
105 hdrend = hdr.find(NULL_BYTE)
106 typ, size = hdr[:hdrend].split(BYTE_SPACE)
107 size = int(size)
108 self._s = size
110 # adjust internal state to match actual header length that we ignore
111 # The buffer will be depleted first on future reads
112 self._br = 0
113 hdrend += 1
114 self._buf = BytesIO(hdr[hdrend:])
115 self._buflen = len(hdr) - hdrend
117 self._phi = True
119 return typ, size
121 #{ Interface
123 @classmethod
124 def new(self, m, close_on_deletion=False):
125 """Create a new DecompressMemMapReader instance for acting as a read-only stream
126 This method parses the object header from m and returns the parsed
127 type and size, as well as the created stream instance.
129 :param m: memory map on which to operate. It must be object data ( header + contents )
130 :param close_on_deletion: if True, the memory map will be closed once we are
131 being deleted"""
132 inst = DecompressMemMapReader(m, close_on_deletion, 0)
133 typ, size = inst._parse_header_info()
134 return typ, size, inst
136 def data(self):
137 """:return: random access compatible data we are working on"""
138 return self._m
140 def close(self):
141 """Close our underlying stream of compressed bytes if this was allowed during initialization
142 :return: True if we closed the underlying stream
143 :note: can be called safely
144 """
145 if self._close:
146 if hasattr(self._m, 'close'):
147 self._m.close()
148 self._close = False
149 # END handle resource freeing
151 def compressed_bytes_read(self):
152 """
153 :return: number of compressed bytes read. This includes the bytes it
154 took to decompress the header ( if there was one )"""
155 # ABSTRACT: When decompressing a byte stream, it can be that the first
156 # x bytes which were requested match the first x bytes in the loosely
157 # compressed datastream. This is the worst-case assumption that the reader
158 # does, it assumes that it will get at least X bytes from X compressed bytes
159 # in call cases.
160 # The caveat is that the object, according to our known uncompressed size,
161 # is already complete, but there are still some bytes left in the compressed
162 # stream that contribute to the amount of compressed bytes.
163 # How can we know that we are truly done, and have read all bytes we need
164 # to read ?
165 # Without help, we cannot know, as we need to obtain the status of the
166 # decompression. If it is not finished, we need to decompress more data
167 # until it is finished, to yield the actual number of compressed bytes
168 # belonging to the decompressed object
169 # We are using a custom zlib module for this, if its not present,
170 # we try to put in additional bytes up for decompression if feasible
171 # and check for the unused_data.
173 # Only scrub the stream forward if we are officially done with the
174 # bytes we were to have.
175 if self._br == self._s and not self._zip.unused_data:
176 # manipulate the bytes-read to allow our own read method to continue
177 # but keep the window at its current position
178 self._br = 0
179 if hasattr(self._zip, 'status'):
180 while self._zip.status == zlib.Z_OK:
181 self.read(mmap.PAGESIZE)
182 # END scrub-loop custom zlib
183 else:
184 # pass in additional pages, until we have unused data
185 while not self._zip.unused_data and self._cbr != len(self._m):
186 self.read(mmap.PAGESIZE)
187 # END scrub-loop default zlib
188 # END handle stream scrubbing
190 # reset bytes read, just to be sure
191 self._br = self._s
192 # END handle stream scrubbing
194 # unused data ends up in the unconsumed tail, which was removed
195 # from the count already
196 return self._cbr
198 #} END interface
200 def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)):
201 """Allows to reset the stream to restart reading
202 :raise ValueError: If offset and whence are not 0"""
203 if offset != 0 or whence != getattr(os, 'SEEK_SET', 0):
204 raise ValueError("Can only seek to position 0")
205 # END handle offset
207 self._zip = zlib.decompressobj()
208 self._br = self._cws = self._cwe = self._cbr = 0
209 if self._phi:
210 self._phi = False
211 del(self._s) # trigger header parsing on first access
212 # END skip header
214 def read(self, size=-1):
215 if size < 1:
216 size = self._s - self._br
217 else:
218 size = min(size, self._s - self._br)
219 # END clamp size
221 if size == 0:
222 return b''
223 # END handle depletion
225 # deplete the buffer, then just continue using the decompress object
226 # which has an own buffer. We just need this to transparently parse the
227 # header from the zlib stream
228 dat = b''
229 if self._buf:
230 if self._buflen >= size:
231 # have enough data
232 dat = self._buf.read(size)
233 self._buflen -= size
234 self._br += size
235 return dat
236 else:
237 dat = self._buf.read() # ouch, duplicates data
238 size -= self._buflen
239 self._br += self._buflen
241 self._buflen = 0
242 self._buf = None
243 # END handle buffer len
244 # END handle buffer
246 # decompress some data
247 # Abstract: zlib needs to operate on chunks of our memory map ( which may
248 # be large ), as it will otherwise and always fill in the 'unconsumed_tail'
249 # attribute which possible reads our whole map to the end, forcing
250 # everything to be read from disk even though just a portion was requested.
251 # As this would be a nogo, we workaround it by passing only chunks of data,
252 # moving the window into the memory map along as we decompress, which keeps
253 # the tail smaller than our chunk-size. This causes 'only' the chunk to be
254 # copied once, and another copy of a part of it when it creates the unconsumed
255 # tail. We have to use it to hand in the appropriate amount of bytes during
256 # the next read.
257 tail = self._zip.unconsumed_tail
258 if tail:
259 # move the window, make it as large as size demands. For code-clarity,
260 # we just take the chunk from our map again instead of reusing the unconsumed
261 # tail. The latter one would safe some memory copying, but we could end up
262 # with not getting enough data uncompressed, so we had to sort that out as well.
263 # Now we just assume the worst case, hence the data is uncompressed and the window
264 # needs to be as large as the uncompressed bytes we want to read.
265 self._cws = self._cwe - len(tail)
266 self._cwe = self._cws + size
267 else:
268 cws = self._cws
269 self._cws = self._cwe
270 self._cwe = cws + size
271 # END handle tail
273 # if window is too small, make it larger so zip can decompress something
274 if self._cwe - self._cws < 8:
275 self._cwe = self._cws + 8
276 # END adjust winsize
278 # takes a slice, but doesn't copy the data, it says ...
279 indata = self._m[self._cws:self._cwe]
281 # get the actual window end to be sure we don't use it for computations
282 self._cwe = self._cws + len(indata)
283 dcompdat = self._zip.decompress(indata, size)
284 # update the amount of compressed bytes read
285 # We feed possibly overlapping chunks, which is why the unconsumed tail
286 # has to be taken into consideration, as well as the unused data
287 # if we hit the end of the stream
288 # NOTE: Behavior changed in PY2.7 onward, which requires special handling to make the tests work properly.
289 # They are thorough, and I assume it is truly working.
290 # Why is this logic as convoluted as it is ? Please look at the table in
291 # https://github.com/gitpython-developers/gitdb/issues/19 to learn about the test-results.
292 # Basically, on py2.6, you want to use branch 1, whereas on all other python version, the second branch
293 # will be the one that works.
294 # However, the zlib VERSIONs as well as the platform check is used to further match the entries in the
295 # table in the github issue. This is it ... it was the only way I could make this work everywhere.
296 # IT's CERTAINLY GOING TO BITE US IN THE FUTURE ... .
297 if zlib.ZLIB_VERSION in ('1.2.7', '1.2.5') and not sys.platform == 'darwin':
298 unused_datalen = len(self._zip.unconsumed_tail)
299 else:
300 unused_datalen = len(self._zip.unconsumed_tail) + len(self._zip.unused_data)
301 # # end handle very special case ...
303 self._cbr += len(indata) - unused_datalen
304 self._br += len(dcompdat)
306 if dat:
307 dcompdat = dat + dcompdat
308 # END prepend our cached data
310 # it can happen, depending on the compression, that we get less bytes
311 # than ordered as it needs the final portion of the data as well.
312 # Recursively resolve that.
313 # Note: dcompdat can be empty even though we still appear to have bytes
314 # to read, if we are called by compressed_bytes_read - it manipulates
315 # us to empty the stream
316 if dcompdat and (len(dcompdat) - len(dat)) < size and self._br < self._s:
317 dcompdat += self.read(size - len(dcompdat))
318 # END handle special case
319 return dcompdat
322class DeltaApplyReader(LazyMixin):
324 """A reader which dynamically applies pack deltas to a base object, keeping the
325 memory demands to a minimum.
327 The size of the final object is only obtainable once all deltas have been
328 applied, unless it is retrieved from a pack index.
330 The uncompressed Delta has the following layout (MSB being a most significant
331 bit encoded dynamic size):
333 * MSB Source Size - the size of the base against which the delta was created
334 * MSB Target Size - the size of the resulting data after the delta was applied
335 * A list of one byte commands (cmd) which are followed by a specific protocol:
337 * cmd & 0x80 - copy delta_data[offset:offset+size]
339 * Followed by an encoded offset into the delta data
340 * Followed by an encoded size of the chunk to copy
342 * cmd & 0x7f - insert
344 * insert cmd bytes from the delta buffer into the output stream
346 * cmd == 0 - invalid operation ( or error in delta stream )
347 """
348 __slots__ = (
349 "_bstream", # base stream to which to apply the deltas
350 "_dstreams", # tuple of delta stream readers
351 "_mm_target", # memory map of the delta-applied data
352 "_size", # actual number of bytes in _mm_target
353 "_br" # number of bytes read
354 )
356 #{ Configuration
357 k_max_memory_move = 250 * 1000 * 1000
358 #} END configuration
360 def __init__(self, stream_list):
361 """Initialize this instance with a list of streams, the first stream being
362 the delta to apply on top of all following deltas, the last stream being the
363 base object onto which to apply the deltas"""
364 assert len(stream_list) > 1, "Need at least one delta and one base stream"
366 self._bstream = stream_list[-1]
367 self._dstreams = tuple(stream_list[:-1])
368 self._br = 0
370 def _set_cache_too_slow_without_c(self, attr):
371 # the direct algorithm is fastest and most direct if there is only one
372 # delta. Also, the extra overhead might not be worth it for items smaller
373 # than X - definitely the case in python, every function call costs
374 # huge amounts of time
375 # if len(self._dstreams) * self._bstream.size < self.k_max_memory_move:
376 if len(self._dstreams) == 1:
377 return self._set_cache_brute_(attr)
379 # Aggregate all deltas into one delta in reverse order. Hence we take
380 # the last delta, and reverse-merge its ancestor delta, until we receive
381 # the final delta data stream.
382 dcl = connect_deltas(self._dstreams)
384 # call len directly, as the (optional) c version doesn't implement the sequence
385 # protocol
386 if dcl.rbound() == 0:
387 self._size = 0
388 self._mm_target = allocate_memory(0)
389 return
390 # END handle empty list
392 self._size = dcl.rbound()
393 self._mm_target = allocate_memory(self._size)
395 bbuf = allocate_memory(self._bstream.size)
396 stream_copy(self._bstream.read, bbuf.write, self._bstream.size, 256 * mmap.PAGESIZE)
398 # APPLY CHUNKS
399 write = self._mm_target.write
400 dcl.apply(bbuf, write)
402 self._mm_target.seek(0)
404 def _set_cache_brute_(self, attr):
405 """If we are here, we apply the actual deltas"""
406 # TODO: There should be a special case if there is only one stream
407 # Then the default-git algorithm should perform a tad faster, as the
408 # delta is not peaked into, causing less overhead.
409 buffer_info_list = list()
410 max_target_size = 0
411 for dstream in self._dstreams:
412 buf = dstream.read(512) # read the header information + X
413 offset, src_size = msb_size(buf)
414 offset, target_size = msb_size(buf, offset)
415 buffer_info_list.append((buf[offset:], offset, src_size, target_size))
416 max_target_size = max(max_target_size, target_size)
417 # END for each delta stream
419 # sanity check - the first delta to apply should have the same source
420 # size as our actual base stream
421 base_size = self._bstream.size
422 target_size = max_target_size
424 # if we have more than 1 delta to apply, we will swap buffers, hence we must
425 # assure that all buffers we use are large enough to hold all the results
426 if len(self._dstreams) > 1:
427 base_size = target_size = max(base_size, max_target_size)
428 # END adjust buffer sizes
430 # Allocate private memory map big enough to hold the first base buffer
431 # We need random access to it
432 bbuf = allocate_memory(base_size)
433 stream_copy(self._bstream.read, bbuf.write, base_size, 256 * mmap.PAGESIZE)
435 # allocate memory map large enough for the largest (intermediate) target
436 # We will use it as scratch space for all delta ops. If the final
437 # target buffer is smaller than our allocated space, we just use parts
438 # of it upon return.
439 tbuf = allocate_memory(target_size)
441 # for each delta to apply, memory map the decompressed delta and
442 # work on the op-codes to reconstruct everything.
443 # For the actual copying, we use a seek and write pattern of buffer
444 # slices.
445 final_target_size = None
446 for (dbuf, offset, src_size, target_size), dstream in zip(reversed(buffer_info_list), reversed(self._dstreams)):
447 # allocate a buffer to hold all delta data - fill in the data for
448 # fast access. We do this as we know that reading individual bytes
449 # from our stream would be slower than necessary ( although possible )
450 # The dbuf buffer contains commands after the first two MSB sizes, the
451 # offset specifies the amount of bytes read to get the sizes.
452 ddata = allocate_memory(dstream.size - offset)
453 ddata.write(dbuf)
454 # read the rest from the stream. The size we give is larger than necessary
455 stream_copy(dstream.read, ddata.write, dstream.size, 256 * mmap.PAGESIZE)
457 #######################################################################
458 if 'c_apply_delta' in globals():
459 c_apply_delta(bbuf, ddata, tbuf)
460 else:
461 apply_delta_data(bbuf, src_size, ddata, len(ddata), tbuf.write)
462 #######################################################################
464 # finally, swap out source and target buffers. The target is now the
465 # base for the next delta to apply
466 bbuf, tbuf = tbuf, bbuf
467 bbuf.seek(0)
468 tbuf.seek(0)
469 final_target_size = target_size
470 # END for each delta to apply
472 # its already seeked to 0, constrain it to the actual size
473 # NOTE: in the end of the loop, it swaps buffers, hence our target buffer
474 # is not tbuf, but bbuf !
475 self._mm_target = bbuf
476 self._size = final_target_size
478 #{ Configuration
479 if not has_perf_mod: 479 ↛ 482line 479 didn't jump to line 482, because the condition on line 479 was never false
480 _set_cache_ = _set_cache_brute_
481 else:
482 _set_cache_ = _set_cache_too_slow_without_c
484 #} END configuration
486 def read(self, count=0):
487 bl = self._size - self._br # bytes left
488 if count < 1 or count > bl:
489 count = bl
490 # NOTE: we could check for certain size limits, and possibly
491 # return buffers instead of strings to prevent byte copying
492 data = self._mm_target.read(count)
493 self._br += len(data)
494 return data
496 def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)):
497 """Allows to reset the stream to restart reading
499 :raise ValueError: If offset and whence are not 0"""
500 if offset != 0 or whence != getattr(os, 'SEEK_SET', 0):
501 raise ValueError("Can only seek to position 0")
502 # END handle offset
503 self._br = 0
504 self._mm_target.seek(0)
506 #{ Interface
508 @classmethod
509 def new(cls, stream_list):
510 """
511 Convert the given list of streams into a stream which resolves deltas
512 when reading from it.
514 :param stream_list: two or more stream objects, first stream is a Delta
515 to the object that you want to resolve, followed by N additional delta
516 streams. The list's last stream must be a non-delta stream.
518 :return: Non-Delta OPackStream object whose stream can be used to obtain
519 the decompressed resolved data
520 :raise ValueError: if the stream list cannot be handled"""
521 if len(stream_list) < 2:
522 raise ValueError("Need at least two streams")
523 # END single object special handling
525 if stream_list[-1].type_id in delta_types:
526 raise ValueError(
527 "Cannot resolve deltas if there is no base object stream, last one was type: %s" % stream_list[-1].type)
528 # END check stream
529 return cls(stream_list)
531 #} END interface
533 #{ OInfo like Interface
535 @property
536 def type(self):
537 return self._bstream.type
539 @property
540 def type_id(self):
541 return self._bstream.type_id
543 @property
544 def size(self):
545 """:return: number of uncompressed bytes in the stream"""
546 return self._size
548 #} END oinfo like interface
551#} END RO streams
554#{ W Streams
556class Sha1Writer:
558 """Simple stream writer which produces a sha whenever you like as it degests
559 everything it is supposed to write"""
560 __slots__ = "sha1"
562 def __init__(self):
563 self.sha1 = make_sha()
565 #{ Stream Interface
567 def write(self, data):
568 """:raise IOError: If not all bytes could be written
569 :param data: byte object
570 :return: length of incoming data"""
572 self.sha1.update(data)
574 return len(data)
576 # END stream interface
578 #{ Interface
580 def sha(self, as_hex=False):
581 """:return: sha so far
582 :param as_hex: if True, sha will be hex-encoded, binary otherwise"""
583 if as_hex:
584 return self.sha1.hexdigest()
585 return self.sha1.digest()
587 #} END interface
590class FlexibleSha1Writer(Sha1Writer):
592 """Writer producing a sha1 while passing on the written bytes to the given
593 write function"""
594 __slots__ = 'writer'
596 def __init__(self, writer):
597 Sha1Writer.__init__(self)
598 self.writer = writer
600 def write(self, data):
601 Sha1Writer.write(self, data)
602 self.writer(data)
605class ZippedStoreShaWriter(Sha1Writer):
607 """Remembers everything someone writes to it and generates a sha"""
608 __slots__ = ('buf', 'zip')
610 def __init__(self):
611 Sha1Writer.__init__(self)
612 self.buf = BytesIO()
613 self.zip = zlib.compressobj(zlib.Z_BEST_SPEED)
615 def __getattr__(self, attr):
616 return getattr(self.buf, attr)
618 def write(self, data):
619 alen = Sha1Writer.write(self, data)
620 self.buf.write(self.zip.compress(data))
622 return alen
624 def close(self):
625 self.buf.write(self.zip.flush())
627 def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)):
628 """Seeking currently only supports to rewind written data
629 Multiple writes are not supported"""
630 if offset != 0 or whence != getattr(os, 'SEEK_SET', 0):
631 raise ValueError("Can only seek to position 0")
632 # END handle offset
633 self.buf.seek(0)
635 def getvalue(self):
636 """:return: string value from the current stream position to the end"""
637 return self.buf.getvalue()
640class FDCompressedSha1Writer(Sha1Writer):
642 """Digests data written to it, making the sha available, then compress the
643 data and write it to the file descriptor
645 **Note:** operates on raw file descriptors
646 **Note:** for this to work, you have to use the close-method of this instance"""
647 __slots__ = ("fd", "sha1", "zip")
649 # default exception
650 exc = IOError("Failed to write all bytes to filedescriptor")
652 def __init__(self, fd):
653 super().__init__()
654 self.fd = fd
655 self.zip = zlib.compressobj(zlib.Z_BEST_SPEED)
657 #{ Stream Interface
659 def write(self, data):
660 """:raise IOError: If not all bytes could be written
661 :return: length of incoming data"""
662 self.sha1.update(data)
663 cdata = self.zip.compress(data)
664 bytes_written = write(self.fd, cdata)
666 if bytes_written != len(cdata):
667 raise self.exc
669 return len(data)
671 def close(self):
672 remainder = self.zip.flush()
673 if write(self.fd, remainder) != len(remainder):
674 raise self.exc
675 return close(self.fd)
677 #} END stream interface
680class FDStream:
682 """A simple wrapper providing the most basic functions on a file descriptor
683 with the fileobject interface. Cannot use os.fdopen as the resulting stream
684 takes ownership"""
685 __slots__ = ("_fd", '_pos')
687 def __init__(self, fd):
688 self._fd = fd
689 self._pos = 0
691 def write(self, data):
692 self._pos += len(data)
693 os.write(self._fd, data)
695 def read(self, count=0):
696 if count == 0:
697 count = os.path.getsize(self._filepath)
698 # END handle read everything
700 bytes = os.read(self._fd, count)
701 self._pos += len(bytes)
702 return bytes
704 def fileno(self):
705 return self._fd
707 def tell(self):
708 return self._pos
710 def close(self):
711 close(self._fd)
714class NullStream:
716 """A stream that does nothing but providing a stream interface.
717 Use it like /dev/null"""
718 __slots__ = tuple()
720 def read(self, size=0):
721 return ''
723 def close(self):
724 pass
726 def write(self, data):
727 return len(data)
730#} END W streams