Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/files/locks.py: 26%

60 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1""" 

2Portable file locking utilities. 

3 

4Based partially on an example by Jonathan Feignberg in the Python 

5Cookbook [1] (licensed under the Python Software License) and a ctypes port by 

6Anatoly Techtonik for Roundup [2] (license [3]). 

7 

8[1] http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65203 

9[2] https://sourceforge.net/p/roundup/code/ci/default/tree/roundup/backends/portalocker.py # NOQA 

10[3] https://sourceforge.net/p/roundup/code/ci/default/tree/COPYING.txt 

11 

12Example Usage:: 

13 

14 >>> from django.core.files import locks 

15 >>> with open('./file', 'wb') as f: 

16 ... locks.lock(f, locks.LOCK_EX) 

17 ... f.write('Django') 

18""" 

19import os 

20 

21__all__ = ("LOCK_EX", "LOCK_SH", "LOCK_NB", "lock", "unlock") 

22 

23 

24def _fd(f): 

25 """Get a filedescriptor from something which could be a file or an fd.""" 

26 return f.fileno() if hasattr(f, "fileno") else f 

27 

28 

29if os.name == "nt": 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true

30 import msvcrt 

31 from ctypes import ( 

32 POINTER, 

33 Structure, 

34 Union, 

35 byref, 

36 c_int64, 

37 c_ulong, 

38 c_void_p, 

39 sizeof, 

40 windll, 

41 ) 

42 from ctypes.wintypes import BOOL, DWORD, HANDLE 

43 

44 LOCK_SH = 0 # the default 

45 LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY 

46 LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK 

47 

48 # --- Adapted from the pyserial project --- 

49 # detect size of ULONG_PTR 

50 if sizeof(c_ulong) != sizeof(c_void_p): 

51 ULONG_PTR = c_int64 

52 else: 

53 ULONG_PTR = c_ulong 

54 PVOID = c_void_p 

55 

56 # --- Union inside Structure by stackoverflow:3480240 --- 

57 class _OFFSET(Structure): 

58 _fields_ = [("Offset", DWORD), ("OffsetHigh", DWORD)] 

59 

60 class _OFFSET_UNION(Union): 

61 _anonymous_ = ["_offset"] 

62 _fields_ = [("_offset", _OFFSET), ("Pointer", PVOID)] 

63 

64 class OVERLAPPED(Structure): 

65 _anonymous_ = ["_offset_union"] 

66 _fields_ = [ 

67 ("Internal", ULONG_PTR), 

68 ("InternalHigh", ULONG_PTR), 

69 ("_offset_union", _OFFSET_UNION), 

70 ("hEvent", HANDLE), 

71 ] 

72 

73 LPOVERLAPPED = POINTER(OVERLAPPED) 

74 

75 # --- Define function prototypes for extra safety --- 

76 LockFileEx = windll.kernel32.LockFileEx 

77 LockFileEx.restype = BOOL 

78 LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] 

79 UnlockFileEx = windll.kernel32.UnlockFileEx 

80 UnlockFileEx.restype = BOOL 

81 UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] 

82 

83 def lock(f, flags): 

84 hfile = msvcrt.get_osfhandle(_fd(f)) 

85 overlapped = OVERLAPPED() 

86 ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped)) 

87 return bool(ret) 

88 

89 def unlock(f): 

90 hfile = msvcrt.get_osfhandle(_fd(f)) 

91 overlapped = OVERLAPPED() 

92 ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped)) 

93 return bool(ret) 

94 

95else: 

96 try: 

97 import fcntl 

98 

99 LOCK_SH = fcntl.LOCK_SH # shared lock 

100 LOCK_NB = fcntl.LOCK_NB # non-blocking 

101 LOCK_EX = fcntl.LOCK_EX 

102 except (ImportError, AttributeError): 

103 # File locking is not supported. 

104 LOCK_EX = LOCK_SH = LOCK_NB = 0 

105 

106 # Dummy functions that don't do anything. 

107 def lock(f, flags): 

108 # File is not locked 

109 return False 

110 

111 def unlock(f): 

112 # File is unlocked 

113 return True 

114 

115 else: 

116 

117 def lock(f, flags): 

118 try: 

119 fcntl.flock(_fd(f), flags) 

120 return True 

121 except BlockingIOError: 

122 return False 

123 

124 def unlock(f): 

125 fcntl.flock(_fd(f), fcntl.LOCK_UN) 

126 return True