root/galaxy-central/eggs/Beaker-1.4-py2.6.egg/beaker/synchronization.py

リビジョン 3, 11.5 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1"""Synchronization functions.
2
3File- and mutex-based mutual exclusion synchronizers are provided,
4as well as a name-based mutex which locks within an application
5based on a string name.
6
7"""
8
9import os
10import sys
11import tempfile
12
13try:
14    import threading as _threading
15except ImportError:
16    import dummy_threading as _threading
17
18# check for fcntl module
19try:
20    sys.getwindowsversion()
21    has_flock = False
22except:
23    try:
24        import fcntl
25        has_flock = True
26    except ImportError:
27        has_flock = False
28
29from beaker import util
30from beaker.exceptions import LockError
31
32__all__  = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer",
33            "NameLock", "_threading"]
34
35
36class NameLock(object):
37    """a proxy for an RLock object that is stored in a name based
38    registry. 
39   
40    Multiple threads can get a reference to the same RLock based on the
41    name alone, and synchronize operations related to that name.
42
43    """     
44    locks = util.WeakValuedRegistry()
45
46    class NLContainer(object):
47        def __init__(self, reentrant):
48            if reentrant:
49                self.lock = _threading.RLock()
50            else:
51                self.lock = _threading.Lock()
52        def __call__(self):
53            return self.lock
54
55    def __init__(self, identifier = None, reentrant = False):
56        if identifier is None:
57            self._lock = NameLock.NLContainer(reentrant)
58        else:
59            self._lock = NameLock.locks.get(identifier, NameLock.NLContainer,
60                                            reentrant)
61
62    def acquire(self, wait = True):
63        return self._lock().acquire(wait)
64
65    def release(self):
66        self._lock().release()
67
68
69_synchronizers = util.WeakValuedRegistry()
70def _synchronizer(identifier, cls, **kwargs):
71    return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs)
72
73
74def file_synchronizer(identifier, **kwargs):
75    if not has_flock or 'lock_dir' not in kwargs:
76        return mutex_synchronizer(identifier)
77    else:
78        return _synchronizer(identifier, FileSynchronizer, **kwargs)
79
80
81def mutex_synchronizer(identifier, **kwargs):
82    return _synchronizer(identifier, ConditionSynchronizer, **kwargs)
83
84
85class null_synchronizer(object):
86    def acquire_write_lock(self, wait=True):
87        return True
88    def acquire_read_lock(self):
89        pass
90    def release_write_lock(self):
91        pass
92    def release_read_lock(self):
93        pass
94    acquire = acquire_write_lock
95    release = release_write_lock
96
97
98class SynchronizerImpl(object):
99    def __init__(self):
100        self._state = util.ThreadLocal()
101
102    class SyncState(object):
103        __slots__ = 'reentrantcount', 'writing', 'reading'
104
105        def __init__(self):
106            self.reentrantcount = 0
107            self.writing = False
108            self.reading = False
109
110    def state(self):
111        if not self._state.has():
112            state = SynchronizerImpl.SyncState()
113            self._state.put(state)
114            return state
115        else:
116            return self._state.get()
117    state = property(state)
118   
119    def release_read_lock(self):
120        state = self.state
121
122        if state.writing:
123            raise LockError("lock is in writing state")
124        if not state.reading:
125            raise LockError("lock is not in reading state")
126       
127        if state.reentrantcount == 1:
128            self.do_release_read_lock()
129            state.reading = False
130
131        state.reentrantcount -= 1
132       
133    def acquire_read_lock(self, wait = True):
134        state = self.state
135
136        if state.writing:
137            raise LockError("lock is in writing state")
138       
139        if state.reentrantcount == 0:
140            x = self.do_acquire_read_lock(wait)
141            if (wait or x):
142                state.reentrantcount += 1
143                state.reading = True
144            return x
145        elif state.reading:
146            state.reentrantcount += 1
147            return True
148           
149    def release_write_lock(self):
150        state = self.state
151
152        if state.reading:
153            raise LockError("lock is in reading state")
154        if not state.writing:
155            raise LockError("lock is not in writing state")
156
157        if state.reentrantcount == 1:
158            self.do_release_write_lock()
159            state.writing = False
160
161        state.reentrantcount -= 1
162   
163    release = release_write_lock
164   
165    def acquire_write_lock(self, wait  = True):
166        state = self.state
167
168        if state.reading:
169            raise LockError("lock is in reading state")
170       
171        if state.reentrantcount == 0:
172            x = self.do_acquire_write_lock(wait)
173            if (wait or x):
174                state.reentrantcount += 1
175                state.writing = True
176            return x
177        elif state.writing:
178            state.reentrantcount += 1
179            return True
180
181    acquire = acquire_write_lock
182
183    def do_release_read_lock(self):
184        raise NotImplementedError()
185   
186    def do_acquire_read_lock(self):
187        raise NotImplementedError()
188   
189    def do_release_write_lock(self):
190        raise NotImplementedError()
191   
192    def do_acquire_write_lock(self):
193        raise NotImplementedError()
194
195
196class FileSynchronizer(SynchronizerImpl):
197    """a synchronizer which locks using flock().
198
199    Adapted for Python/multithreads from Apache::Session::Lock::File,
200    http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm
201   
202    This module does not unlink temporary files,
203    because it interferes with proper locking.  This can cause
204    problems on certain systems (Linux) whose file systems (ext2) do not
205    perform well with lots of files in one directory.  To prevent this
206    you should use a script to clean out old files from your lock directory.
207   
208    """
209    def __init__(self, identifier, lock_dir):
210        super(FileSynchronizer, self).__init__()
211        self._filedescriptor = util.ThreadLocal()
212       
213        if lock_dir is None:
214            lock_dir = tempfile.gettempdir()
215        else:
216            lock_dir = lock_dir
217
218        self.filename = util.encoded_path(
219                            lock_dir,
220                            [identifier],
221                            extension='.lock'
222                        )
223
224    def _filedesc(self):
225        return self._filedescriptor.get()
226    _filedesc = property(_filedesc)
227       
228    def _open(self, mode):
229        filedescriptor = self._filedesc
230        if filedescriptor is None:
231            filedescriptor = os.open(self.filename, mode)
232            self._filedescriptor.put(filedescriptor)
233        return filedescriptor
234           
235    def do_acquire_read_lock(self, wait):
236        filedescriptor = self._open(os.O_CREAT | os.O_RDONLY)
237        if not wait:
238            try:
239                fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB)
240                return True
241            except IOError:
242                os.close(filedescriptor)
243                self._filedescriptor.remove()
244                return False
245        else:
246            fcntl.flock(filedescriptor, fcntl.LOCK_SH)
247            return True
248
249    def do_acquire_write_lock(self, wait):
250        filedescriptor = self._open(os.O_CREAT | os.O_WRONLY)
251        if not wait:
252            try:
253                fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB)
254                return True
255            except IOError:
256                os.close(filedescriptor)
257                self._filedescriptor.remove()
258                return False
259        else:
260            fcntl.flock(filedescriptor, fcntl.LOCK_EX)
261            return True
262   
263    def do_release_read_lock(self):
264        self._release_all_locks()
265   
266    def do_release_write_lock(self):
267        self._release_all_locks()
268   
269    def _release_all_locks(self):
270        filedescriptor = self._filedesc
271        if filedescriptor is not None:
272            fcntl.flock(filedescriptor, fcntl.LOCK_UN)
273            os.close(filedescriptor)
274            self._filedescriptor.remove()
275
276
277class ConditionSynchronizer(SynchronizerImpl):
278    """a synchronizer using a Condition."""
279   
280    def __init__(self, identifier):
281        super(ConditionSynchronizer, self).__init__()
282
283        # counts how many asynchronous methods are executing
284        self.async = 0
285
286        # pointer to thread that is the current sync operation
287        self.current_sync_operation = None
288
289        # condition object to lock on
290        self.condition = _threading.Condition(_threading.Lock())
291
292    def do_acquire_read_lock(self, wait = True):   
293        self.condition.acquire()
294        try:
295            # see if a synchronous operation is waiting to start
296            # or is already running, in which case we wait (or just
297            # give up and return)
298            if wait:
299                while self.current_sync_operation is not None:
300                    self.condition.wait()
301            else:
302                if self.current_sync_operation is not None:
303                    return False
304
305            self.async += 1
306        finally:
307            self.condition.release()
308
309        if not wait:
310            return True
311       
312    def do_release_read_lock(self):
313        self.condition.acquire()
314        try:
315            self.async -= 1
316       
317            # check if we are the last asynchronous reader thread
318            # out the door.
319            if self.async == 0:
320                # yes. so if a sync operation is waiting, notifyAll to wake
321                # it up
322                if self.current_sync_operation is not None:
323                    self.condition.notifyAll()
324            elif self.async < 0:
325                raise LockError("Synchronizer error - too many "
326                                "release_read_locks called")
327        finally:
328            self.condition.release()
329   
330    def do_acquire_write_lock(self, wait = True):
331        self.condition.acquire()
332        try:
333            # here, we are not a synchronous reader, and after returning,
334            # assuming waiting or immediate availability, we will be.
335       
336            if wait:
337                # if another sync is working, wait
338                while self.current_sync_operation is not None:
339                    self.condition.wait()
340            else:
341                # if another sync is working,
342                # we dont want to wait, so forget it
343                if self.current_sync_operation is not None:
344                    return False
345           
346            # establish ourselves as the current sync
347            # this indicates to other read/write operations
348            # that they should wait until this is None again
349            self.current_sync_operation = _threading.currentThread()
350
351            # now wait again for asyncs to finish
352            if self.async > 0:
353                if wait:
354                    # wait
355                    self.condition.wait()
356                else:
357                    # we dont want to wait, so forget it
358                    self.current_sync_operation = None
359                    return False
360        finally:
361            self.condition.release()
362       
363        if not wait:
364            return True
365
366    def do_release_write_lock(self):
367        self.condition.acquire()
368        try:
369            if self.current_sync_operation is not _threading.currentThread():
370                raise LockError("Synchronizer error - current thread doesnt "
371                                "have the write lock")
372
373            # reset the current sync operation so
374            # another can get it
375            self.current_sync_operation = None
376
377            # tell everyone to get ready
378            self.condition.notifyAll()
379        finally:
380            # everyone go !!
381            self.condition.release()
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。