root/galaxy-central/eggs/SQLAlchemy-0.5.6_dev_r6498-py2.6.egg/sqlalchemy/queue.py

リビジョン 3, 6.1 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
2behavior, using RLock instead of Lock for its mutex object.
3
4This is to support the connection pool's usage of weakref callbacks to return
5connections to the underlying Queue, which can apparently in extremely
6rare cases be invoked within the ``get()`` method of the Queue itself,
7producing a ``put()`` inside the ``get()`` and therefore a reentrant
8condition."""
9
10from collections import deque
11from time import time as _time
12from sqlalchemy.util import threading
13
14__all__ = ['Empty', 'Full', 'Queue']
15
16class Empty(Exception):
17    "Exception raised by Queue.get(block=0)/get_nowait()."
18
19    pass
20
21class Full(Exception):
22    "Exception raised by Queue.put(block=0)/put_nowait()."
23
24    pass
25
26class Queue:
27    def __init__(self, maxsize=0):
28        """Initialize a queue object with a given maximum size.
29
30        If `maxsize` is <= 0, the queue size is infinite.
31        """
32
33        self._init(maxsize)
34        # mutex must be held whenever the queue is mutating.  All methods
35        # that acquire mutex must release it before returning.  mutex
36        # is shared between the two conditions, so acquiring and
37        # releasing the conditions also acquires and releases mutex.
38        self.mutex = threading.RLock()
39        # Notify not_empty whenever an item is added to the queue; a
40        # thread waiting to get is notified then.
41        self.not_empty = threading.Condition(self.mutex)
42        # Notify not_full whenever an item is removed from the queue;
43        # a thread waiting to put is notified then.
44        self.not_full = threading.Condition(self.mutex)
45
46    def qsize(self):
47        """Return the approximate size of the queue (not reliable!)."""
48
49        self.mutex.acquire()
50        n = self._qsize()
51        self.mutex.release()
52        return n
53
54    def empty(self):
55        """Return True if the queue is empty, False otherwise (not reliable!)."""
56
57        self.mutex.acquire()
58        n = self._empty()
59        self.mutex.release()
60        return n
61
62    def full(self):
63        """Return True if the queue is full, False otherwise (not reliable!)."""
64
65        self.mutex.acquire()
66        n = self._full()
67        self.mutex.release()
68        return n
69
70    def put(self, item, block=True, timeout=None):
71        """Put an item into the queue.
72
73        If optional args `block` is True and `timeout` is None (the
74        default), block if necessary until a free slot is
75        available. If `timeout` is a positive number, it blocks at
76        most `timeout` seconds and raises the ``Full`` exception if no
77        free slot was available within that time.  Otherwise (`block`
78        is false), put an item on the queue if a free slot is
79        immediately available, else raise the ``Full`` exception
80        (`timeout` is ignored in that case).
81        """
82
83        self.not_full.acquire()
84        try:
85            if not block:
86                if self._full():
87                    raise Full
88            elif timeout is None:
89                while self._full():
90                    self.not_full.wait()
91            else:
92                if timeout < 0:
93                    raise ValueError("'timeout' must be a positive number")
94                endtime = _time() + timeout
95                while self._full():
96                    remaining = endtime - _time()
97                    if remaining <= 0.0:
98                        raise Full
99                    self.not_full.wait(remaining)
100            self._put(item)
101            self.not_empty.notify()
102        finally:
103            self.not_full.release()
104
105    def put_nowait(self, item):
106        """Put an item into the queue without blocking.
107
108        Only enqueue the item if a free slot is immediately available.
109        Otherwise raise the ``Full`` exception.
110        """
111        return self.put(item, False)
112
113    def get(self, block=True, timeout=None):
114        """Remove and return an item from the queue.
115
116        If optional args `block` is True and `timeout` is None (the
117        default), block if necessary until an item is available. If
118        `timeout` is a positive number, it blocks at most `timeout`
119        seconds and raises the ``Empty`` exception if no item was
120        available within that time.  Otherwise (`block` is false),
121        return an item if one is immediately available, else raise the
122        ``Empty`` exception (`timeout` is ignored in that case).
123        """
124
125        self.not_empty.acquire()
126        try:
127            if not block:
128                if self._empty():
129                    raise Empty
130            elif timeout is None:
131                while self._empty():
132                    self.not_empty.wait()
133            else:
134                if timeout < 0:
135                    raise ValueError("'timeout' must be a positive number")
136                endtime = _time() + timeout
137                while self._empty():
138                    remaining = endtime - _time()
139                    if remaining <= 0.0:
140                        raise Empty
141                    self.not_empty.wait(remaining)
142            item = self._get()
143            self.not_full.notify()
144            return item
145        finally:
146            self.not_empty.release()
147
148    def get_nowait(self):
149        """Remove and return an item from the queue without blocking.
150
151        Only get an item if one is immediately available. Otherwise
152        raise the ``Empty`` exception.
153        """
154
155        return self.get(False)
156
157    # Override these methods to implement other queue organizations
158    # (e.g. stack or priority queue).
159    # These will only be called with appropriate locks held
160
161    # Initialize the queue representation
162    def _init(self, maxsize):
163        self.maxsize = maxsize
164        self.queue = deque()
165
166    def _qsize(self):
167        return len(self.queue)
168
169    # Check whether the queue is empty
170    def _empty(self):
171        return not self.queue
172
173    # Check whether the queue is full
174    def _full(self):
175        return self.maxsize > 0 and len(self.queue) == self.maxsize
176
177    # Put a new item in the queue
178    def _put(self, item):
179        self.queue.append(item)
180
181    # Get an item from the queue
182    def _get(self):
183        return self.queue.popleft()
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。