1 | """Synchronization functions. |
---|
2 | |
---|
3 | File- and mutex-based mutual exclusion synchronizers are provided, |
---|
4 | as well as a name-based mutex which locks within an application |
---|
5 | based on a string name. |
---|
6 | |
---|
7 | """ |
---|
8 | |
---|
9 | import os |
---|
10 | import sys |
---|
11 | import tempfile |
---|
12 | |
---|
13 | try: |
---|
14 | import threading as _threading |
---|
15 | except ImportError: |
---|
16 | import dummy_threading as _threading |
---|
17 | |
---|
18 | # check for fcntl module |
---|
19 | try: |
---|
20 | sys.getwindowsversion() |
---|
21 | has_flock = False |
---|
22 | except: |
---|
23 | try: |
---|
24 | import fcntl |
---|
25 | has_flock = True |
---|
26 | except ImportError: |
---|
27 | has_flock = False |
---|
28 | |
---|
29 | from beaker import util |
---|
30 | from beaker.exceptions import LockError |
---|
31 | |
---|
32 | __all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer", |
---|
33 | "NameLock", "_threading"] |
---|
34 | |
---|
35 | |
---|
36 | class NameLock(object): |
---|
37 | """a proxy for an RLock object that is stored in a name based |
---|
38 | registry. |
---|
39 | |
---|
40 | Multiple threads can get a reference to the same RLock based on the |
---|
41 | name alone, and synchronize operations related to that name. |
---|
42 | |
---|
43 | """ |
---|
44 | locks = util.WeakValuedRegistry() |
---|
45 | |
---|
46 | class NLContainer(object): |
---|
47 | def __init__(self, reentrant): |
---|
48 | if reentrant: |
---|
49 | self.lock = _threading.RLock() |
---|
50 | else: |
---|
51 | self.lock = _threading.Lock() |
---|
52 | def __call__(self): |
---|
53 | return self.lock |
---|
54 | |
---|
55 | def __init__(self, identifier = None, reentrant = False): |
---|
56 | if identifier is None: |
---|
57 | self._lock = NameLock.NLContainer(reentrant) |
---|
58 | else: |
---|
59 | self._lock = NameLock.locks.get(identifier, NameLock.NLContainer, |
---|
60 | reentrant) |
---|
61 | |
---|
62 | def acquire(self, wait = True): |
---|
63 | return self._lock().acquire(wait) |
---|
64 | |
---|
65 | def release(self): |
---|
66 | self._lock().release() |
---|
67 | |
---|
68 | |
---|
69 | _synchronizers = util.WeakValuedRegistry() |
---|
70 | def _synchronizer(identifier, cls, **kwargs): |
---|
71 | return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs) |
---|
72 | |
---|
73 | |
---|
74 | def file_synchronizer(identifier, **kwargs): |
---|
75 | if not has_flock or 'lock_dir' not in kwargs: |
---|
76 | return mutex_synchronizer(identifier) |
---|
77 | else: |
---|
78 | return _synchronizer(identifier, FileSynchronizer, **kwargs) |
---|
79 | |
---|
80 | |
---|
81 | def mutex_synchronizer(identifier, **kwargs): |
---|
82 | return _synchronizer(identifier, ConditionSynchronizer, **kwargs) |
---|
83 | |
---|
84 | |
---|
85 | class null_synchronizer(object): |
---|
86 | def acquire_write_lock(self, wait=True): |
---|
87 | return True |
---|
88 | def acquire_read_lock(self): |
---|
89 | pass |
---|
90 | def release_write_lock(self): |
---|
91 | pass |
---|
92 | def release_read_lock(self): |
---|
93 | pass |
---|
94 | acquire = acquire_write_lock |
---|
95 | release = release_write_lock |
---|
96 | |
---|
97 | |
---|
98 | class SynchronizerImpl(object): |
---|
99 | def __init__(self): |
---|
100 | self._state = util.ThreadLocal() |
---|
101 | |
---|
102 | class SyncState(object): |
---|
103 | __slots__ = 'reentrantcount', 'writing', 'reading' |
---|
104 | |
---|
105 | def __init__(self): |
---|
106 | self.reentrantcount = 0 |
---|
107 | self.writing = False |
---|
108 | self.reading = False |
---|
109 | |
---|
110 | def state(self): |
---|
111 | if not self._state.has(): |
---|
112 | state = SynchronizerImpl.SyncState() |
---|
113 | self._state.put(state) |
---|
114 | return state |
---|
115 | else: |
---|
116 | return self._state.get() |
---|
117 | state = property(state) |
---|
118 | |
---|
119 | def release_read_lock(self): |
---|
120 | state = self.state |
---|
121 | |
---|
122 | if state.writing: |
---|
123 | raise LockError("lock is in writing state") |
---|
124 | if not state.reading: |
---|
125 | raise LockError("lock is not in reading state") |
---|
126 | |
---|
127 | if state.reentrantcount == 1: |
---|
128 | self.do_release_read_lock() |
---|
129 | state.reading = False |
---|
130 | |
---|
131 | state.reentrantcount -= 1 |
---|
132 | |
---|
133 | def acquire_read_lock(self, wait = True): |
---|
134 | state = self.state |
---|
135 | |
---|
136 | if state.writing: |
---|
137 | raise LockError("lock is in writing state") |
---|
138 | |
---|
139 | if state.reentrantcount == 0: |
---|
140 | x = self.do_acquire_read_lock(wait) |
---|
141 | if (wait or x): |
---|
142 | state.reentrantcount += 1 |
---|
143 | state.reading = True |
---|
144 | return x |
---|
145 | elif state.reading: |
---|
146 | state.reentrantcount += 1 |
---|
147 | return True |
---|
148 | |
---|
149 | def release_write_lock(self): |
---|
150 | state = self.state |
---|
151 | |
---|
152 | if state.reading: |
---|
153 | raise LockError("lock is in reading state") |
---|
154 | if not state.writing: |
---|
155 | raise LockError("lock is not in writing state") |
---|
156 | |
---|
157 | if state.reentrantcount == 1: |
---|
158 | self.do_release_write_lock() |
---|
159 | state.writing = False |
---|
160 | |
---|
161 | state.reentrantcount -= 1 |
---|
162 | |
---|
163 | release = release_write_lock |
---|
164 | |
---|
165 | def acquire_write_lock(self, wait = True): |
---|
166 | state = self.state |
---|
167 | |
---|
168 | if state.reading: |
---|
169 | raise LockError("lock is in reading state") |
---|
170 | |
---|
171 | if state.reentrantcount == 0: |
---|
172 | x = self.do_acquire_write_lock(wait) |
---|
173 | if (wait or x): |
---|
174 | state.reentrantcount += 1 |
---|
175 | state.writing = True |
---|
176 | return x |
---|
177 | elif state.writing: |
---|
178 | state.reentrantcount += 1 |
---|
179 | return True |
---|
180 | |
---|
181 | acquire = acquire_write_lock |
---|
182 | |
---|
183 | def do_release_read_lock(self): |
---|
184 | raise NotImplementedError() |
---|
185 | |
---|
186 | def do_acquire_read_lock(self): |
---|
187 | raise NotImplementedError() |
---|
188 | |
---|
189 | def do_release_write_lock(self): |
---|
190 | raise NotImplementedError() |
---|
191 | |
---|
192 | def do_acquire_write_lock(self): |
---|
193 | raise NotImplementedError() |
---|
194 | |
---|
195 | |
---|
196 | class FileSynchronizer(SynchronizerImpl): |
---|
197 | """a synchronizer which locks using flock(). |
---|
198 | |
---|
199 | Adapted for Python/multithreads from Apache::Session::Lock::File, |
---|
200 | http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm |
---|
201 | |
---|
202 | This module does not unlink temporary files, |
---|
203 | because it interferes with proper locking. This can cause |
---|
204 | problems on certain systems (Linux) whose file systems (ext2) do not |
---|
205 | perform well with lots of files in one directory. To prevent this |
---|
206 | you should use a script to clean out old files from your lock directory. |
---|
207 | |
---|
208 | """ |
---|
209 | def __init__(self, identifier, lock_dir): |
---|
210 | super(FileSynchronizer, self).__init__() |
---|
211 | self._filedescriptor = util.ThreadLocal() |
---|
212 | |
---|
213 | if lock_dir is None: |
---|
214 | lock_dir = tempfile.gettempdir() |
---|
215 | else: |
---|
216 | lock_dir = lock_dir |
---|
217 | |
---|
218 | self.filename = util.encoded_path( |
---|
219 | lock_dir, |
---|
220 | [identifier], |
---|
221 | extension='.lock' |
---|
222 | ) |
---|
223 | |
---|
224 | def _filedesc(self): |
---|
225 | return self._filedescriptor.get() |
---|
226 | _filedesc = property(_filedesc) |
---|
227 | |
---|
228 | def _open(self, mode): |
---|
229 | filedescriptor = self._filedesc |
---|
230 | if filedescriptor is None: |
---|
231 | filedescriptor = os.open(self.filename, mode) |
---|
232 | self._filedescriptor.put(filedescriptor) |
---|
233 | return filedescriptor |
---|
234 | |
---|
235 | def do_acquire_read_lock(self, wait): |
---|
236 | filedescriptor = self._open(os.O_CREAT | os.O_RDONLY) |
---|
237 | if not wait: |
---|
238 | try: |
---|
239 | fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB) |
---|
240 | return True |
---|
241 | except IOError: |
---|
242 | os.close(filedescriptor) |
---|
243 | self._filedescriptor.remove() |
---|
244 | return False |
---|
245 | else: |
---|
246 | fcntl.flock(filedescriptor, fcntl.LOCK_SH) |
---|
247 | return True |
---|
248 | |
---|
249 | def do_acquire_write_lock(self, wait): |
---|
250 | filedescriptor = self._open(os.O_CREAT | os.O_WRONLY) |
---|
251 | if not wait: |
---|
252 | try: |
---|
253 | fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB) |
---|
254 | return True |
---|
255 | except IOError: |
---|
256 | os.close(filedescriptor) |
---|
257 | self._filedescriptor.remove() |
---|
258 | return False |
---|
259 | else: |
---|
260 | fcntl.flock(filedescriptor, fcntl.LOCK_EX) |
---|
261 | return True |
---|
262 | |
---|
263 | def do_release_read_lock(self): |
---|
264 | self._release_all_locks() |
---|
265 | |
---|
266 | def do_release_write_lock(self): |
---|
267 | self._release_all_locks() |
---|
268 | |
---|
269 | def _release_all_locks(self): |
---|
270 | filedescriptor = self._filedesc |
---|
271 | if filedescriptor is not None: |
---|
272 | fcntl.flock(filedescriptor, fcntl.LOCK_UN) |
---|
273 | os.close(filedescriptor) |
---|
274 | self._filedescriptor.remove() |
---|
275 | |
---|
276 | |
---|
277 | class ConditionSynchronizer(SynchronizerImpl): |
---|
278 | """a synchronizer using a Condition.""" |
---|
279 | |
---|
280 | def __init__(self, identifier): |
---|
281 | super(ConditionSynchronizer, self).__init__() |
---|
282 | |
---|
283 | # counts how many asynchronous methods are executing |
---|
284 | self.async = 0 |
---|
285 | |
---|
286 | # pointer to thread that is the current sync operation |
---|
287 | self.current_sync_operation = None |
---|
288 | |
---|
289 | # condition object to lock on |
---|
290 | self.condition = _threading.Condition(_threading.Lock()) |
---|
291 | |
---|
292 | def do_acquire_read_lock(self, wait = True): |
---|
293 | self.condition.acquire() |
---|
294 | try: |
---|
295 | # see if a synchronous operation is waiting to start |
---|
296 | # or is already running, in which case we wait (or just |
---|
297 | # give up and return) |
---|
298 | if wait: |
---|
299 | while self.current_sync_operation is not None: |
---|
300 | self.condition.wait() |
---|
301 | else: |
---|
302 | if self.current_sync_operation is not None: |
---|
303 | return False |
---|
304 | |
---|
305 | self.async += 1 |
---|
306 | finally: |
---|
307 | self.condition.release() |
---|
308 | |
---|
309 | if not wait: |
---|
310 | return True |
---|
311 | |
---|
312 | def do_release_read_lock(self): |
---|
313 | self.condition.acquire() |
---|
314 | try: |
---|
315 | self.async -= 1 |
---|
316 | |
---|
317 | # check if we are the last asynchronous reader thread |
---|
318 | # out the door. |
---|
319 | if self.async == 0: |
---|
320 | # yes. so if a sync operation is waiting, notifyAll to wake |
---|
321 | # it up |
---|
322 | if self.current_sync_operation is not None: |
---|
323 | self.condition.notifyAll() |
---|
324 | elif self.async < 0: |
---|
325 | raise LockError("Synchronizer error - too many " |
---|
326 | "release_read_locks called") |
---|
327 | finally: |
---|
328 | self.condition.release() |
---|
329 | |
---|
330 | def do_acquire_write_lock(self, wait = True): |
---|
331 | self.condition.acquire() |
---|
332 | try: |
---|
333 | # here, we are not a synchronous reader, and after returning, |
---|
334 | # assuming waiting or immediate availability, we will be. |
---|
335 | |
---|
336 | if wait: |
---|
337 | # if another sync is working, wait |
---|
338 | while self.current_sync_operation is not None: |
---|
339 | self.condition.wait() |
---|
340 | else: |
---|
341 | # if another sync is working, |
---|
342 | # we dont want to wait, so forget it |
---|
343 | if self.current_sync_operation is not None: |
---|
344 | return False |
---|
345 | |
---|
346 | # establish ourselves as the current sync |
---|
347 | # this indicates to other read/write operations |
---|
348 | # that they should wait until this is None again |
---|
349 | self.current_sync_operation = _threading.currentThread() |
---|
350 | |
---|
351 | # now wait again for asyncs to finish |
---|
352 | if self.async > 0: |
---|
353 | if wait: |
---|
354 | # wait |
---|
355 | self.condition.wait() |
---|
356 | else: |
---|
357 | # we dont want to wait, so forget it |
---|
358 | self.current_sync_operation = None |
---|
359 | return False |
---|
360 | finally: |
---|
361 | self.condition.release() |
---|
362 | |
---|
363 | if not wait: |
---|
364 | return True |
---|
365 | |
---|
366 | def do_release_write_lock(self): |
---|
367 | self.condition.acquire() |
---|
368 | try: |
---|
369 | if self.current_sync_operation is not _threading.currentThread(): |
---|
370 | raise LockError("Synchronizer error - current thread doesnt " |
---|
371 | "have the write lock") |
---|
372 | |
---|
373 | # reset the current sync operation so |
---|
374 | # another can get it |
---|
375 | self.current_sync_operation = None |
---|
376 | |
---|
377 | # tell everyone to get ready |
---|
378 | self.condition.notifyAll() |
---|
379 | finally: |
---|
380 | # everyone go !! |
---|
381 | self.condition.release() |
---|