Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/django/core/files/locks.py: 26%
60 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Portable file locking utilities.
4Based partially on an example by Jonathan Feignberg in the Python
5Cookbook [1] (licensed under the Python Software License) and a ctypes port by
6Anatoly Techtonik for Roundup [2] (license [3]).
8[1] http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65203
9[2] https://sourceforge.net/p/roundup/code/ci/default/tree/roundup/backends/portalocker.py # NOQA
10[3] https://sourceforge.net/p/roundup/code/ci/default/tree/COPYING.txt
12Example Usage::
14 >>> from django.core.files import locks
15 >>> with open('./file', 'wb') as f:
16 ... locks.lock(f, locks.LOCK_EX)
17 ... f.write('Django')
18"""
19import os
21__all__ = ("LOCK_EX", "LOCK_SH", "LOCK_NB", "lock", "unlock")
24def _fd(f):
25 """Get a filedescriptor from something which could be a file or an fd."""
26 return f.fileno() if hasattr(f, "fileno") else f
29if os.name == "nt": 29 ↛ 30line 29 didn't jump to line 30, because the condition on line 29 was never true
30 import msvcrt
31 from ctypes import (
32 POINTER,
33 Structure,
34 Union,
35 byref,
36 c_int64,
37 c_ulong,
38 c_void_p,
39 sizeof,
40 windll,
41 )
42 from ctypes.wintypes import BOOL, DWORD, HANDLE
44 LOCK_SH = 0 # the default
45 LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
46 LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK
48 # --- Adapted from the pyserial project ---
49 # detect size of ULONG_PTR
50 if sizeof(c_ulong) != sizeof(c_void_p):
51 ULONG_PTR = c_int64
52 else:
53 ULONG_PTR = c_ulong
54 PVOID = c_void_p
56 # --- Union inside Structure by stackoverflow:3480240 ---
57 class _OFFSET(Structure):
58 _fields_ = [("Offset", DWORD), ("OffsetHigh", DWORD)]
60 class _OFFSET_UNION(Union):
61 _anonymous_ = ["_offset"]
62 _fields_ = [("_offset", _OFFSET), ("Pointer", PVOID)]
64 class OVERLAPPED(Structure):
65 _anonymous_ = ["_offset_union"]
66 _fields_ = [
67 ("Internal", ULONG_PTR),
68 ("InternalHigh", ULONG_PTR),
69 ("_offset_union", _OFFSET_UNION),
70 ("hEvent", HANDLE),
71 ]
73 LPOVERLAPPED = POINTER(OVERLAPPED)
75 # --- Define function prototypes for extra safety ---
76 LockFileEx = windll.kernel32.LockFileEx
77 LockFileEx.restype = BOOL
78 LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
79 UnlockFileEx = windll.kernel32.UnlockFileEx
80 UnlockFileEx.restype = BOOL
81 UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
83 def lock(f, flags):
84 hfile = msvcrt.get_osfhandle(_fd(f))
85 overlapped = OVERLAPPED()
86 ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
87 return bool(ret)
89 def unlock(f):
90 hfile = msvcrt.get_osfhandle(_fd(f))
91 overlapped = OVERLAPPED()
92 ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
93 return bool(ret)
95else:
96 try:
97 import fcntl
99 LOCK_SH = fcntl.LOCK_SH # shared lock
100 LOCK_NB = fcntl.LOCK_NB # non-blocking
101 LOCK_EX = fcntl.LOCK_EX
102 except (ImportError, AttributeError):
103 # File locking is not supported.
104 LOCK_EX = LOCK_SH = LOCK_NB = 0
106 # Dummy functions that don't do anything.
107 def lock(f, flags):
108 # File is not locked
109 return False
111 def unlock(f):
112 # File is unlocked
113 return True
115 else:
117 def lock(f, flags):
118 try:
119 fcntl.flock(_fd(f), flags)
120 return True
121 except BlockingIOError:
122 return False
124 def unlock(f):
125 fcntl.flock(_fd(f), fcntl.LOCK_UN)
126 return True