Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/coreapi/codecs/download.py: 19%

85 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1# coding: utf-8 

2from coreapi.codecs.base import BaseCodec 

3from coreapi.compat import urlparse 

4from coreapi.utils import DownloadedFile, guess_extension 

5import cgi 

6import os 

7import posixpath 

8import tempfile 

9 

10 

11def _unique_output_path(path): 

12 """ 

13 Given a path like '/a/b/c.txt' 

14 

15 Return the first available filename that doesn't already exist, 

16 using an incrementing suffix if needed. 

17 

18 For example: '/a/b/c.txt' or '/a/b/c (1).txt' or '/a/b/c (2).txt'... 

19 """ 

20 basename, ext = os.path.splitext(path) 

21 idx = 0 

22 while os.path.exists(path): 

23 idx += 1 

24 path = "%s (%d)%s" % (basename, idx, ext) 

25 return path 

26 

27 

28def _safe_filename(filename): 

29 """ 

30 Sanitize output filenames, to remove any potentially unsafe characters. 

31 """ 

32 filename = os.path.basename(filename) 

33 

34 keepcharacters = (' ', '.', '_', '-') 

35 filename = ''.join( 

36 char for char in filename 

37 if char.isalnum() or char in keepcharacters 

38 ).strip().strip('.') 

39 

40 return filename 

41 

42 

43def _get_filename_from_content_disposition(content_disposition): 

44 """ 

45 Determine an output filename based on the `Content-Disposition` header. 

46 """ 

47 params = value, params = cgi.parse_header(content_disposition) 

48 

49 if 'filename*' in params: 

50 try: 

51 charset, lang, filename = params['filename*'].split('\'', 2) 

52 filename = urlparse.unquote(filename) 

53 filename = filename.encode('iso-8859-1').decode(charset) 

54 return _safe_filename(filename) 

55 except (ValueError, LookupError): 

56 pass 

57 

58 if 'filename' in params: 

59 filename = params['filename'] 

60 return _safe_filename(filename) 

61 

62 return None 

63 

64 

65def _get_filename_from_url(url, content_type=None): 

66 """ 

67 Determine an output filename based on the download URL. 

68 """ 

69 parsed = urlparse.urlparse(url) 

70 final_path_component = posixpath.basename(parsed.path.rstrip('/')) 

71 filename = _safe_filename(final_path_component) 

72 suffix = guess_extension(content_type or '') 

73 

74 if filename: 

75 if '.' not in filename: 

76 return filename + suffix 

77 return filename 

78 elif suffix: 

79 return 'download' + suffix 

80 

81 return None 

82 

83 

84def _get_filename(base_url=None, content_type=None, content_disposition=None): 

85 """ 

86 Determine an output filename to use for the download. 

87 """ 

88 filename = None 

89 if content_disposition: 

90 filename = _get_filename_from_content_disposition(content_disposition) 

91 if base_url and not filename: 

92 filename = _get_filename_from_url(base_url, content_type) 

93 if not filename: 

94 return None # Ensure empty filenames return as `None` for consistency. 

95 return filename 

96 

97 

98class DownloadCodec(BaseCodec): 

99 """ 

100 A codec to handle raw file downloads, such as images and other media. 

101 """ 

102 media_type = '*/*' 

103 format = 'download' 

104 

105 def __init__(self, download_dir=None): 

106 """ 

107 `download_dir` - The path to use for file downloads. 

108 """ 

109 self._delete_on_close = download_dir is None 

110 self._download_dir = download_dir 

111 

112 @property 

113 def download_dir(self): 

114 return self._download_dir 

115 

116 def decode(self, bytestring, **options): 

117 base_url = options.get('base_url') 

118 content_type = options.get('content_type') 

119 content_disposition = options.get('content_disposition') 

120 

121 # Write the download to a temporary .download file. 

122 fd, temp_path = tempfile.mkstemp(suffix='.download') 

123 file_handle = os.fdopen(fd, 'wb') 

124 file_handle.write(bytestring) 

125 file_handle.close() 

126 

127 # Determine the output filename. 

128 output_filename = _get_filename(base_url, content_type, content_disposition) 

129 if output_filename is None: 

130 output_filename = os.path.basename(temp_path) 

131 

132 # Determine the output directory. 

133 output_dir = self._download_dir 

134 if output_dir is None: 

135 output_dir = os.path.dirname(temp_path) 

136 

137 # Determine the full output path. 

138 output_path = os.path.join(output_dir, output_filename) 

139 

140 # Move the temporary download file to the final location. 

141 if output_path != temp_path: 

142 output_path = _unique_output_path(output_path) 

143 os.rename(temp_path, output_path) 

144 

145 # Open the file and return the file object. 

146 output_file = open(output_path, 'rb') 

147 downloaded = DownloadedFile(output_file, output_path, delete=self._delete_on_close) 

148 downloaded.basename = output_filename 

149 return downloaded