[3] | 1 | """Synchronization functions. |
---|
| 2 | |
---|
| 3 | File- and mutex-based mutual exclusion synchronizers are provided, |
---|
| 4 | as well as a name-based mutex which locks within an application |
---|
| 5 | based on a string name. |
---|
| 6 | |
---|
| 7 | """ |
---|
| 8 | |
---|
| 9 | import os |
---|
| 10 | import sys |
---|
| 11 | import tempfile |
---|
| 12 | |
---|
| 13 | try: |
---|
| 14 | import threading as _threading |
---|
| 15 | except ImportError: |
---|
| 16 | import dummy_threading as _threading |
---|
| 17 | |
---|
| 18 | # check for fcntl module |
---|
| 19 | try: |
---|
| 20 | sys.getwindowsversion() |
---|
| 21 | has_flock = False |
---|
| 22 | except: |
---|
| 23 | try: |
---|
| 24 | import fcntl |
---|
| 25 | has_flock = True |
---|
| 26 | except ImportError: |
---|
| 27 | has_flock = False |
---|
| 28 | |
---|
| 29 | from beaker import util |
---|
| 30 | from beaker.exceptions import LockError |
---|
| 31 | |
---|
| 32 | __all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer", |
---|
| 33 | "NameLock", "_threading"] |
---|
| 34 | |
---|
| 35 | |
---|
| 36 | class NameLock(object): |
---|
| 37 | """a proxy for an RLock object that is stored in a name based |
---|
| 38 | registry. |
---|
| 39 | |
---|
| 40 | Multiple threads can get a reference to the same RLock based on the |
---|
| 41 | name alone, and synchronize operations related to that name. |
---|
| 42 | |
---|
| 43 | """ |
---|
| 44 | locks = util.WeakValuedRegistry() |
---|
| 45 | |
---|
| 46 | class NLContainer(object): |
---|
| 47 | def __init__(self, reentrant): |
---|
| 48 | if reentrant: |
---|
| 49 | self.lock = _threading.RLock() |
---|
| 50 | else: |
---|
| 51 | self.lock = _threading.Lock() |
---|
| 52 | def __call__(self): |
---|
| 53 | return self.lock |
---|
| 54 | |
---|
| 55 | def __init__(self, identifier = None, reentrant = False): |
---|
| 56 | if identifier is None: |
---|
| 57 | self._lock = NameLock.NLContainer(reentrant) |
---|
| 58 | else: |
---|
| 59 | self._lock = NameLock.locks.get(identifier, NameLock.NLContainer, |
---|
| 60 | reentrant) |
---|
| 61 | |
---|
| 62 | def acquire(self, wait = True): |
---|
| 63 | return self._lock().acquire(wait) |
---|
| 64 | |
---|
| 65 | def release(self): |
---|
| 66 | self._lock().release() |
---|
| 67 | |
---|
| 68 | |
---|
| 69 | _synchronizers = util.WeakValuedRegistry() |
---|
| 70 | def _synchronizer(identifier, cls, **kwargs): |
---|
| 71 | return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs) |
---|
| 72 | |
---|
| 73 | |
---|
| 74 | def file_synchronizer(identifier, **kwargs): |
---|
| 75 | if not has_flock or 'lock_dir' not in kwargs: |
---|
| 76 | return mutex_synchronizer(identifier) |
---|
| 77 | else: |
---|
| 78 | return _synchronizer(identifier, FileSynchronizer, **kwargs) |
---|
| 79 | |
---|
| 80 | |
---|
| 81 | def mutex_synchronizer(identifier, **kwargs): |
---|
| 82 | return _synchronizer(identifier, ConditionSynchronizer, **kwargs) |
---|
| 83 | |
---|
| 84 | |
---|
| 85 | class null_synchronizer(object): |
---|
| 86 | def acquire_write_lock(self, wait=True): |
---|
| 87 | return True |
---|
| 88 | def acquire_read_lock(self): |
---|
| 89 | pass |
---|
| 90 | def release_write_lock(self): |
---|
| 91 | pass |
---|
| 92 | def release_read_lock(self): |
---|
| 93 | pass |
---|
| 94 | acquire = acquire_write_lock |
---|
| 95 | release = release_write_lock |
---|
| 96 | |
---|
| 97 | |
---|
| 98 | class SynchronizerImpl(object): |
---|
| 99 | def __init__(self): |
---|
| 100 | self._state = util.ThreadLocal() |
---|
| 101 | |
---|
| 102 | class SyncState(object): |
---|
| 103 | __slots__ = 'reentrantcount', 'writing', 'reading' |
---|
| 104 | |
---|
| 105 | def __init__(self): |
---|
| 106 | self.reentrantcount = 0 |
---|
| 107 | self.writing = False |
---|
| 108 | self.reading = False |
---|
| 109 | |
---|
| 110 | def state(self): |
---|
| 111 | if not self._state.has(): |
---|
| 112 | state = SynchronizerImpl.SyncState() |
---|
| 113 | self._state.put(state) |
---|
| 114 | return state |
---|
| 115 | else: |
---|
| 116 | return self._state.get() |
---|
| 117 | state = property(state) |
---|
| 118 | |
---|
| 119 | def release_read_lock(self): |
---|
| 120 | state = self.state |
---|
| 121 | |
---|
| 122 | if state.writing: |
---|
| 123 | raise LockError("lock is in writing state") |
---|
| 124 | if not state.reading: |
---|
| 125 | raise LockError("lock is not in reading state") |
---|
| 126 | |
---|
| 127 | if state.reentrantcount == 1: |
---|
| 128 | self.do_release_read_lock() |
---|
| 129 | state.reading = False |
---|
| 130 | |
---|
| 131 | state.reentrantcount -= 1 |
---|
| 132 | |
---|
| 133 | def acquire_read_lock(self, wait = True): |
---|
| 134 | state = self.state |
---|
| 135 | |
---|
| 136 | if state.writing: |
---|
| 137 | raise LockError("lock is in writing state") |
---|
| 138 | |
---|
| 139 | if state.reentrantcount == 0: |
---|
| 140 | x = self.do_acquire_read_lock(wait) |
---|
| 141 | if (wait or x): |
---|
| 142 | state.reentrantcount += 1 |
---|
| 143 | state.reading = True |
---|
| 144 | return x |
---|
| 145 | elif state.reading: |
---|
| 146 | state.reentrantcount += 1 |
---|
| 147 | return True |
---|
| 148 | |
---|
| 149 | def release_write_lock(self): |
---|
| 150 | state = self.state |
---|
| 151 | |
---|
| 152 | if state.reading: |
---|
| 153 | raise LockError("lock is in reading state") |
---|
| 154 | if not state.writing: |
---|
| 155 | raise LockError("lock is not in writing state") |
---|
| 156 | |
---|
| 157 | if state.reentrantcount == 1: |
---|
| 158 | self.do_release_write_lock() |
---|
| 159 | state.writing = False |
---|
| 160 | |
---|
| 161 | state.reentrantcount -= 1 |
---|
| 162 | |
---|
| 163 | release = release_write_lock |
---|
| 164 | |
---|
| 165 | def acquire_write_lock(self, wait = True): |
---|
| 166 | state = self.state |
---|
| 167 | |
---|
| 168 | if state.reading: |
---|
| 169 | raise LockError("lock is in reading state") |
---|
| 170 | |
---|
| 171 | if state.reentrantcount == 0: |
---|
| 172 | x = self.do_acquire_write_lock(wait) |
---|
| 173 | if (wait or x): |
---|
| 174 | state.reentrantcount += 1 |
---|
| 175 | state.writing = True |
---|
| 176 | return x |
---|
| 177 | elif state.writing: |
---|
| 178 | state.reentrantcount += 1 |
---|
| 179 | return True |
---|
| 180 | |
---|
| 181 | acquire = acquire_write_lock |
---|
| 182 | |
---|
| 183 | def do_release_read_lock(self): |
---|
| 184 | raise NotImplementedError() |
---|
| 185 | |
---|
| 186 | def do_acquire_read_lock(self): |
---|
| 187 | raise NotImplementedError() |
---|
| 188 | |
---|
| 189 | def do_release_write_lock(self): |
---|
| 190 | raise NotImplementedError() |
---|
| 191 | |
---|
| 192 | def do_acquire_write_lock(self): |
---|
| 193 | raise NotImplementedError() |
---|
| 194 | |
---|
| 195 | |
---|
| 196 | class FileSynchronizer(SynchronizerImpl): |
---|
| 197 | """a synchronizer which locks using flock(). |
---|
| 198 | |
---|
| 199 | Adapted for Python/multithreads from Apache::Session::Lock::File, |
---|
| 200 | http://search.cpan.org/src/CWEST/Apache-Session-1.81/Session/Lock/File.pm |
---|
| 201 | |
---|
| 202 | This module does not unlink temporary files, |
---|
| 203 | because it interferes with proper locking. This can cause |
---|
| 204 | problems on certain systems (Linux) whose file systems (ext2) do not |
---|
| 205 | perform well with lots of files in one directory. To prevent this |
---|
| 206 | you should use a script to clean out old files from your lock directory. |
---|
| 207 | |
---|
| 208 | """ |
---|
| 209 | def __init__(self, identifier, lock_dir): |
---|
| 210 | super(FileSynchronizer, self).__init__() |
---|
| 211 | self._filedescriptor = util.ThreadLocal() |
---|
| 212 | |
---|
| 213 | if lock_dir is None: |
---|
| 214 | lock_dir = tempfile.gettempdir() |
---|
| 215 | else: |
---|
| 216 | lock_dir = lock_dir |
---|
| 217 | |
---|
| 218 | self.filename = util.encoded_path( |
---|
| 219 | lock_dir, |
---|
| 220 | [identifier], |
---|
| 221 | extension='.lock' |
---|
| 222 | ) |
---|
| 223 | |
---|
| 224 | def _filedesc(self): |
---|
| 225 | return self._filedescriptor.get() |
---|
| 226 | _filedesc = property(_filedesc) |
---|
| 227 | |
---|
| 228 | def _open(self, mode): |
---|
| 229 | filedescriptor = self._filedesc |
---|
| 230 | if filedescriptor is None: |
---|
| 231 | filedescriptor = os.open(self.filename, mode) |
---|
| 232 | self._filedescriptor.put(filedescriptor) |
---|
| 233 | return filedescriptor |
---|
| 234 | |
---|
| 235 | def do_acquire_read_lock(self, wait): |
---|
| 236 | filedescriptor = self._open(os.O_CREAT | os.O_RDONLY) |
---|
| 237 | if not wait: |
---|
| 238 | try: |
---|
| 239 | fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB) |
---|
| 240 | return True |
---|
| 241 | except IOError: |
---|
| 242 | os.close(filedescriptor) |
---|
| 243 | self._filedescriptor.remove() |
---|
| 244 | return False |
---|
| 245 | else: |
---|
| 246 | fcntl.flock(filedescriptor, fcntl.LOCK_SH) |
---|
| 247 | return True |
---|
| 248 | |
---|
| 249 | def do_acquire_write_lock(self, wait): |
---|
| 250 | filedescriptor = self._open(os.O_CREAT | os.O_WRONLY) |
---|
| 251 | if not wait: |
---|
| 252 | try: |
---|
| 253 | fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB) |
---|
| 254 | return True |
---|
| 255 | except IOError: |
---|
| 256 | os.close(filedescriptor) |
---|
| 257 | self._filedescriptor.remove() |
---|
| 258 | return False |
---|
| 259 | else: |
---|
| 260 | fcntl.flock(filedescriptor, fcntl.LOCK_EX) |
---|
| 261 | return True |
---|
| 262 | |
---|
| 263 | def do_release_read_lock(self): |
---|
| 264 | self._release_all_locks() |
---|
| 265 | |
---|
| 266 | def do_release_write_lock(self): |
---|
| 267 | self._release_all_locks() |
---|
| 268 | |
---|
| 269 | def _release_all_locks(self): |
---|
| 270 | filedescriptor = self._filedesc |
---|
| 271 | if filedescriptor is not None: |
---|
| 272 | fcntl.flock(filedescriptor, fcntl.LOCK_UN) |
---|
| 273 | os.close(filedescriptor) |
---|
| 274 | self._filedescriptor.remove() |
---|
| 275 | |
---|
| 276 | |
---|
| 277 | class ConditionSynchronizer(SynchronizerImpl): |
---|
| 278 | """a synchronizer using a Condition.""" |
---|
| 279 | |
---|
| 280 | def __init__(self, identifier): |
---|
| 281 | super(ConditionSynchronizer, self).__init__() |
---|
| 282 | |
---|
| 283 | # counts how many asynchronous methods are executing |
---|
| 284 | self.async = 0 |
---|
| 285 | |
---|
| 286 | # pointer to thread that is the current sync operation |
---|
| 287 | self.current_sync_operation = None |
---|
| 288 | |
---|
| 289 | # condition object to lock on |
---|
| 290 | self.condition = _threading.Condition(_threading.Lock()) |
---|
| 291 | |
---|
| 292 | def do_acquire_read_lock(self, wait = True): |
---|
| 293 | self.condition.acquire() |
---|
| 294 | try: |
---|
| 295 | # see if a synchronous operation is waiting to start |
---|
| 296 | # or is already running, in which case we wait (or just |
---|
| 297 | # give up and return) |
---|
| 298 | if wait: |
---|
| 299 | while self.current_sync_operation is not None: |
---|
| 300 | self.condition.wait() |
---|
| 301 | else: |
---|
| 302 | if self.current_sync_operation is not None: |
---|
| 303 | return False |
---|
| 304 | |
---|
| 305 | self.async += 1 |
---|
| 306 | finally: |
---|
| 307 | self.condition.release() |
---|
| 308 | |
---|
| 309 | if not wait: |
---|
| 310 | return True |
---|
| 311 | |
---|
| 312 | def do_release_read_lock(self): |
---|
| 313 | self.condition.acquire() |
---|
| 314 | try: |
---|
| 315 | self.async -= 1 |
---|
| 316 | |
---|
| 317 | # check if we are the last asynchronous reader thread |
---|
| 318 | # out the door. |
---|
| 319 | if self.async == 0: |
---|
| 320 | # yes. so if a sync operation is waiting, notifyAll to wake |
---|
| 321 | # it up |
---|
| 322 | if self.current_sync_operation is not None: |
---|
| 323 | self.condition.notifyAll() |
---|
| 324 | elif self.async < 0: |
---|
| 325 | raise LockError("Synchronizer error - too many " |
---|
| 326 | "release_read_locks called") |
---|
| 327 | finally: |
---|
| 328 | self.condition.release() |
---|
| 329 | |
---|
| 330 | def do_acquire_write_lock(self, wait = True): |
---|
| 331 | self.condition.acquire() |
---|
| 332 | try: |
---|
| 333 | # here, we are not a synchronous reader, and after returning, |
---|
| 334 | # assuming waiting or immediate availability, we will be. |
---|
| 335 | |
---|
| 336 | if wait: |
---|
| 337 | # if another sync is working, wait |
---|
| 338 | while self.current_sync_operation is not None: |
---|
| 339 | self.condition.wait() |
---|
| 340 | else: |
---|
| 341 | # if another sync is working, |
---|
| 342 | # we dont want to wait, so forget it |
---|
| 343 | if self.current_sync_operation is not None: |
---|
| 344 | return False |
---|
| 345 | |
---|
| 346 | # establish ourselves as the current sync |
---|
| 347 | # this indicates to other read/write operations |
---|
| 348 | # that they should wait until this is None again |
---|
| 349 | self.current_sync_operation = _threading.currentThread() |
---|
| 350 | |
---|
| 351 | # now wait again for asyncs to finish |
---|
| 352 | if self.async > 0: |
---|
| 353 | if wait: |
---|
| 354 | # wait |
---|
| 355 | self.condition.wait() |
---|
| 356 | else: |
---|
| 357 | # we dont want to wait, so forget it |
---|
| 358 | self.current_sync_operation = None |
---|
| 359 | return False |
---|
| 360 | finally: |
---|
| 361 | self.condition.release() |
---|
| 362 | |
---|
| 363 | if not wait: |
---|
| 364 | return True |
---|
| 365 | |
---|
| 366 | def do_release_write_lock(self): |
---|
| 367 | self.condition.acquire() |
---|
| 368 | try: |
---|
| 369 | if self.current_sync_operation is not _threading.currentThread(): |
---|
| 370 | raise LockError("Synchronizer error - current thread doesnt " |
---|
| 371 | "have the write lock") |
---|
| 372 | |
---|
| 373 | # reset the current sync operation so |
---|
| 374 | # another can get it |
---|
| 375 | self.current_sync_operation = None |
---|
| 376 | |
---|
| 377 | # tell everyone to get ready |
---|
| 378 | self.condition.notifyAll() |
---|
| 379 | finally: |
---|
| 380 | # everyone go !! |
---|
| 381 | self.condition.release() |
---|