root/galaxy-central/eggs/SQLAlchemy-0.5.6_dev_r6498-py2.6.egg/sqlalchemy/pool.py

リビジョン 3, 32.3 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1# pool.py - Connection pooling for SQLAlchemy
2# Copyright (C) 2005, 2006, 2007, 2008, 2009 Michael Bayer mike_mp@zzzcomputing.com
3#
4# This module is part of SQLAlchemy and is released under
5# the MIT License: http://www.opensource.org/licenses/mit-license.php
6
7
8"""Connection pooling for DB-API connections.
9
10Provides a number of connection pool implementations for a variety of
11usage scenarios and thread behavior requirements imposed by the
12application, DB-API or database itself.
13
14Also provides a DB-API 2.0 connection proxying mechanism allowing
15regular DB-API connect() methods to be transparently managed by a
16SQLAlchemy connection pool.
17"""
18
19import weakref, time, threading
20
21from sqlalchemy import exc, log
22from sqlalchemy import queue as Queue
23from sqlalchemy.util import threading, pickle, as_interface
24
25proxies = {}
26
27def manage(module, **params):
28    """Return a proxy for a DB-API module that automatically pools connections.
29
30    Given a DB-API 2.0 module and pool management parameters, returns
31    a proxy for the module that will automatically pool connections,
32    creating new connection pools for each distinct set of connection
33    arguments sent to the decorated module's connect() function.
34
35    :param module: a DB-API 2.0 database module
36
37    :param poolclass: the class used by the pool module to provide
38      pooling.  Defaults to :class:`QueuePool`.
39
40    :param \*\*params: will be passed through to *poolclass*
41
42    """
43    try:
44        return proxies[module]
45    except KeyError:
46        return proxies.setdefault(module, _DBProxy(module, **params))
47
48def clear_managers():
49    """Remove all current DB-API 2.0 managers.
50
51    All pools and connections are disposed.
52    """
53
54    for manager in proxies.values():
55        manager.close()
56    proxies.clear()
57
58class Pool(object):
59    """Abstract base class for connection pools."""
60
61    def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False,
62                 reset_on_return=True, listeners=None):
63        """
64        Construct a Pool.
65
66        :param creator: a callable function that returns a DB-API
67          connection object.  The function will be called with
68          parameters.
69
70        :param recycle: If set to non -1, number of seconds between
71          connection recycling, which means upon checkout, if this
72          timeout is surpassed the connection will be closed and
73          replaced with a newly opened connection. Defaults to -1.
74
75        :param echo: If True, connections being pulled and retrieved
76          from the pool will be logged to the standard output, as well
77          as pool sizing information.  Echoing can also be achieved by
78          enabling logging for the "sqlalchemy.pool"
79          namespace. Defaults to False.
80
81        :param use_threadlocal: If set to True, repeated calls to
82          :meth:`connect` within the same application thread will be
83          guaranteed to return the same connection object, if one has
84          already been retrieved from the pool and has not been
85          returned yet.  Offers a slight performance advantage at the
86          cost of individual transactions by default.  The
87          :meth:`unique_connection` method is provided to bypass the
88          threadlocal behavior installed into :meth:`connect`.
89
90        :param reset_on_return: If true, reset the database state of
91          connections returned to the pool.  This is typically a
92          ROLLBACK to release locks and transaction resources.
93          Disable at your own peril.  Defaults to True.
94
95        :param listeners: A list of
96          :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
97          dictionaries of callables that receive events when DB-API
98          connections are created, checked out and checked in to the
99          pool.
100
101        """
102        self.logger = log.instance_logger(self, echoflag=echo)
103        self._threadconns = threading.local()
104        self._creator = creator
105        self._recycle = recycle
106        self._use_threadlocal = use_threadlocal
107        self._reset_on_return = reset_on_return
108        self.echo = echo
109        self.listeners = []
110        self._on_connect = []
111        self._on_checkout = []
112        self._on_checkin = []
113
114        if listeners:
115            for l in listeners:
116                self.add_listener(l)
117
118    def unique_connection(self):
119        return _ConnectionFairy(self).checkout()
120
121    def create_connection(self):
122        return _ConnectionRecord(self)
123
124    def recreate(self):
125        """Return a new instance with identical creation arguments."""
126
127        raise NotImplementedError()
128
129    def dispose(self):
130        """Dispose of this pool.
131
132        This method leaves the possibility of checked-out connections
133        remaining open, It is advised to not reuse the pool once dispose()
134        is called, and to instead use a new pool constructed by the
135        recreate() method.
136        """
137
138        raise NotImplementedError()
139
140    def connect(self):
141        if not self._use_threadlocal:
142            return _ConnectionFairy(self).checkout()
143
144        try:
145            rec = self._threadconns.current()
146            if rec:
147                return rec.checkout()
148        except AttributeError:
149            pass
150
151        agent = _ConnectionFairy(self)
152        self._threadconns.current = weakref.ref(agent)
153        return agent.checkout()
154
155    def return_conn(self, record):
156        if self._use_threadlocal and hasattr(self._threadconns, "current"):
157            del self._threadconns.current
158        self.do_return_conn(record)
159
160    def get(self):
161        return self.do_get()
162
163    def do_get(self):
164        raise NotImplementedError()
165
166    def do_return_conn(self, conn):
167        raise NotImplementedError()
168
169    def status(self):
170        raise NotImplementedError()
171
172    def add_listener(self, listener):
173        """Add a ``PoolListener``-like object to this pool.
174
175        ``listener`` may be an object that implements some or all of
176        PoolListener, or a dictionary of callables containing implementations
177        of some or all of the named methods in PoolListener.
178
179        """
180
181        listener = as_interface(
182            listener, methods=('connect', 'checkout', 'checkin'))
183
184        self.listeners.append(listener)
185        if hasattr(listener, 'connect'):
186            self._on_connect.append(listener)
187        if hasattr(listener, 'checkout'):
188            self._on_checkout.append(listener)
189        if hasattr(listener, 'checkin'):
190            self._on_checkin.append(listener)
191
192    def log(self, msg):
193        self.logger.info(msg)
194
195class _ConnectionRecord(object):
196    def __init__(self, pool):
197        self.__pool = pool
198        self.connection = self.__connect()
199        self.info = {}
200        if pool._on_connect:
201            for l in pool._on_connect:
202                l.connect(self.connection, self)
203
204    def close(self):
205        if self.connection is not None:
206            if self.__pool._should_log_info:
207                self.__pool.log("Closing connection %r" % self.connection)
208            try:
209                self.connection.close()
210            except (SystemExit, KeyboardInterrupt):
211                raise
212            except:
213                if self.__pool._should_log_info:
214                    self.__pool.log("Exception closing connection %r" %
215                                    self.connection)
216
217    def invalidate(self, e=None):
218        if self.__pool._should_log_info:
219            if e is not None:
220                self.__pool.log("Invalidate connection %r (reason: %s:%s)" %
221                                (self.connection, e.__class__.__name__, e))
222            else:
223                self.__pool.log("Invalidate connection %r" % self.connection)
224        self.__close()
225        self.connection = None
226
227    def get_connection(self):
228        if self.connection is None:
229            self.connection = self.__connect()
230            self.info.clear()
231            if self.__pool._on_connect:
232                for l in self.__pool._on_connect:
233                    l.connect(self.connection, self)
234        elif (self.__pool._recycle > -1 and time.time() - self.starttime > self.__pool._recycle):
235            if self.__pool._should_log_info:
236                self.__pool.log("Connection %r exceeded timeout; recycling" %
237                                self.connection)
238            self.__close()
239            self.connection = self.__connect()
240            self.info.clear()
241            if self.__pool._on_connect:
242                for l in self.__pool._on_connect:
243                    l.connect(self.connection, self)
244        return self.connection
245
246    def __close(self):
247        try:
248            if self.__pool._should_log_info:
249                self.__pool.log("Closing connection %r" % self.connection)
250            self.connection.close()
251        except Exception, e:
252            if self.__pool._should_log_info:
253                self.__pool.log("Connection %r threw an error on close: %s" %
254                                (self.connection, e))
255            if isinstance(e, (SystemExit, KeyboardInterrupt)):
256                raise
257
258    def __connect(self):
259        try:
260            self.starttime = time.time()
261            connection = self.__pool._creator()
262            if self.__pool._should_log_info:
263                self.__pool.log("Created new connection %r" % connection)
264            return connection
265        except Exception, e:
266            if self.__pool._should_log_info:
267                self.__pool.log("Error on connect(): %s" % e)
268            raise
269
270
271def _finalize_fairy(connection, connection_record, pool, ref=None):
272    if ref is not None and connection_record.backref is not ref:
273        return
274    if connection is not None:
275        try:
276            if pool._reset_on_return:
277                connection.rollback()
278            # Immediately close detached instances
279            if connection_record is None:
280                connection.close()
281        except Exception, e:
282            if connection_record is not None:
283                connection_record.invalidate(e=e)
284            if isinstance(e, (SystemExit, KeyboardInterrupt)):
285                raise
286    if connection_record is not None:
287        connection_record.backref = None
288        if pool._should_log_info:
289            pool.log("Connection %r being returned to pool" % connection)
290        if pool._on_checkin:
291            for l in pool._on_checkin:
292                l.checkin(connection, connection_record)
293        pool.return_conn(connection_record)
294
295class _ConnectionFairy(object):
296    """Proxies a DB-API connection and provides return-on-dereference support."""
297
298    __slots__ = '_pool', '__counter', 'connection', '_connection_record', '__weakref__', '_detached_info'
299   
300    def __init__(self, pool):
301        self._pool = pool
302        self.__counter = 0
303        try:
304            rec = self._connection_record = pool.get()
305            conn = self.connection = self._connection_record.get_connection()
306            self._connection_record.backref = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref))
307        except:
308            self.connection = None # helps with endless __getattr__ loops later on
309            self._connection_record = None
310            raise
311        if self._pool._should_log_info:
312            self._pool.log("Connection %r checked out from pool" %
313                           self.connection)
314
315    @property
316    def _logger(self):
317        return self._pool.logger
318
319    @property
320    def is_valid(self):
321        return self.connection is not None
322
323    @property
324    def info(self):
325        """An info collection unique to this DB-API connection."""
326
327        try:
328            return self._connection_record.info
329        except AttributeError:
330            if self.connection is None:
331                raise exc.InvalidRequestError("This connection is closed")
332            try:
333                return self._detached_info
334            except AttributeError:
335                self._detached_info = value = {}
336                return value
337
338    def invalidate(self, e=None):
339        """Mark this connection as invalidated.
340
341        The connection will be immediately closed.  The containing
342        ConnectionRecord will create a new connection when next used.
343        """
344
345        if self.connection is None:
346            raise exc.InvalidRequestError("This connection is closed")
347        if self._connection_record is not None:
348            self._connection_record.invalidate(e=e)
349        self.connection = None
350        self._close()
351
352    def cursor(self, *args, **kwargs):
353        try:
354            c = self.connection.cursor(*args, **kwargs)
355            return _CursorFairy(self, c)
356        except Exception, e:
357            self.invalidate(e=e)
358            raise
359
360    def __getattr__(self, key):
361        return getattr(self.connection, key)
362
363    def checkout(self):
364        if self.connection is None:
365            raise exc.InvalidRequestError("This connection is closed")
366        self.__counter += 1
367
368        if not self._pool._on_checkout or self.__counter != 1:
369            return self
370
371        # Pool listeners can trigger a reconnection on checkout
372        attempts = 2
373        while attempts > 0:
374            try:
375                for l in self._pool._on_checkout:
376                    l.checkout(self.connection, self._connection_record, self)
377                return self
378            except exc.DisconnectionError, e:
379                if self._pool._should_log_info:
380                    self._pool.log(
381                    "Disconnection detected on checkout: %s" % e)
382                self._connection_record.invalidate(e)
383                self.connection = self._connection_record.get_connection()
384                attempts -= 1
385
386        if self._pool._should_log_info:
387            self._pool.log("Reconnection attempts exhausted on checkout")
388        self.invalidate()
389        raise exc.InvalidRequestError("This connection is closed")
390
391    def detach(self):
392        """Separate this connection from its Pool.
393
394        This means that the connection will no longer be returned to the
395        pool when closed, and will instead be literally closed.  The
396        containing ConnectionRecord is separated from the DB-API connection,
397        and will create a new connection when next used.
398
399        Note that any overall connection limiting constraints imposed by a
400        Pool implementation may be violated after a detach, as the detached
401        connection is removed from the pool's knowledge and control.
402        """
403
404        if self._connection_record is not None:
405            self._connection_record.connection = None
406            self._connection_record.backref = None
407            self._pool.do_return_conn(self._connection_record)
408            self._detached_info = \
409              self._connection_record.info.copy()
410            self._connection_record = None
411
412    def close(self):
413        self.__counter -= 1
414        if self.__counter == 0:
415            self._close()
416
417    def _close(self):
418        _finalize_fairy(self.connection, self._connection_record, self._pool)
419        self.connection = None
420        self._connection_record = None
421
422class _CursorFairy(object):
423    __slots__ = '__parent', 'cursor', 'execute'
424
425    def __init__(self, parent, cursor):
426        self.__parent = parent
427        self.cursor = cursor
428        self.execute = cursor.execute
429       
430    def invalidate(self, e=None):
431        self.__parent.invalidate(e=e)
432
433    def close(self):
434        try:
435            self.cursor.close()
436        except Exception, e:
437            try:
438                ex_text = str(e)
439            except TypeError:
440                ex_text = repr(e)
441            self.__parent._logger.warn("Error closing cursor: " + ex_text)
442
443            if isinstance(e, (SystemExit, KeyboardInterrupt)):
444                raise
445
446    def __getattr__(self, key):
447        return getattr(self.cursor, key)
448
449class SingletonThreadPool(Pool):
450    """A Pool that maintains one connection per thread.
451
452    Maintains one connection per each thread, never moving a connection to a
453    thread other than the one which it was created in.
454
455    This is used for SQLite, which both does not handle multithreading by
456    default, and also requires a singleton connection if a :memory: database
457    is being used.
458
459    Options are the same as those of :class:`Pool`, as well as:
460
461    :param pool_size: The number of threads in which to maintain connections
462        at once.  Defaults to five.
463     
464    """
465
466    def __init__(self, creator, pool_size=5, **params):
467        params['use_threadlocal'] = True
468        Pool.__init__(self, creator, **params)
469        self._conn = threading.local()
470        self._all_conns = set()
471        self.size = pool_size
472
473    def recreate(self):
474        self.log("Pool recreating")
475        return SingletonThreadPool(self._creator,
476            pool_size=self.size,
477            recycle=self._recycle,
478            echo=self._should_log_info,
479            use_threadlocal=self._use_threadlocal,
480            listeners=self.listeners)
481
482    def dispose(self):
483        """Dispose of this pool."""
484
485        for conn in self._all_conns:
486            try:
487                conn.close()
488            except (SystemExit, KeyboardInterrupt):
489                raise
490            except:
491                # pysqlite won't even let you close a conn from a thread
492                # that didn't create it
493                pass
494       
495        self._all_conns.clear()
496           
497    def dispose_local(self):
498        if hasattr(self._conn, 'current'):
499            conn = self._conn.current()
500            self._all_conns.discard(conn)
501            del self._conn.current
502
503    def cleanup(self):
504        for conn in list(self._all_conns):
505            self._all_conns.discard(conn)
506            if len(self._all_conns) <= self.size:
507                return
508
509    def status(self):
510        return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns))
511
512    def do_return_conn(self, conn):
513        pass
514
515    def do_get(self):
516        try:
517            c = self._conn.current()
518            if c:
519                return c
520        except AttributeError:
521            pass
522        c = self.create_connection()
523        self._conn.current = weakref.ref(c)
524        self._all_conns.add(c)
525        if len(self._all_conns) > self.size:
526            self.cleanup()
527        return c
528
529class QueuePool(Pool):
530    """A Pool that imposes a limit on the number of open connections."""
531
532    def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
533                 **params):
534        """
535        Construct a QueuePool.
536
537        :param creator: a callable function that returns a DB-API
538          connection object.  The function will be called with
539          parameters.
540
541        :param pool_size: The size of the pool to be maintained. This
542          is the largest number of connections that will be kept
543          persistently in the pool. Note that the pool begins with no
544          connections; once this number of connections is requested,
545          that number of connections will remain. Defaults to 5.
546
547        :param max_overflow: The maximum overflow size of the
548          pool. When the number of checked-out connections reaches the
549          size set in pool_size, additional connections will be
550          returned up to this limit. When those additional connections
551          are returned to the pool, they are disconnected and
552          discarded. It follows then that the total number of
553          simultaneous connections the pool will allow is pool_size +
554          `max_overflow`, and the total number of "sleeping"
555          connections the pool will allow is pool_size. `max_overflow`
556          can be set to -1 to indicate no overflow limit; no limit
557          will be placed on the total number of concurrent
558          connections. Defaults to 10.
559
560        :param timeout: The number of seconds to wait before giving up
561          on returning a connection. Defaults to 30.
562
563        :param recycle: If set to non -1, number of seconds between
564          connection recycling, which means upon checkout, if this
565          timeout is surpassed the connection will be closed and
566          replaced with a newly opened connection. Defaults to -1.
567
568        :param echo: If True, connections being pulled and retrieved
569          from the pool will be logged to the standard output, as well
570          as pool sizing information.  Echoing can also be achieved by
571          enabling logging for the "sqlalchemy.pool"
572          namespace. Defaults to False.
573
574        :param use_threadlocal: If set to True, repeated calls to
575          :meth:`connect` within the same application thread will be
576          guaranteed to return the same connection object, if one has
577          already been retrieved from the pool and has not been
578          returned yet.  Offers a slight performance advantage at the
579          cost of individual transactions by default.  The
580          :meth:`unique_connection` method is provided to bypass the
581          threadlocal behavior installed into :meth:`connect`.
582
583        :param reset_on_return: If true, reset the database state of
584          connections returned to the pool.  This is typically a
585          ROLLBACK to release locks and transaction resources.
586          Disable at your own peril.  Defaults to True.
587
588        :param listeners: A list of
589          :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
590          dictionaries of callables that receive events when DB-API
591          connections are created, checked out and checked in to the
592          pool.
593
594        """
595        Pool.__init__(self, creator, **params)
596        self._pool = Queue.Queue(pool_size)
597        self._overflow = 0 - pool_size
598        self._max_overflow = max_overflow
599        self._timeout = timeout
600        self._overflow_lock = self._max_overflow > -1 and threading.Lock() or None
601
602    def recreate(self):
603        self.log("Pool recreating")
604        return QueuePool(self._creator, pool_size=self._pool.maxsize, max_overflow=self._max_overflow, timeout=self._timeout, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)
605
606    def do_return_conn(self, conn):
607        try:
608            self._pool.put(conn, False)
609        except Queue.Full:
610            if self._overflow_lock is None:
611                self._overflow -= 1
612            else:
613                self._overflow_lock.acquire()
614                try:
615                    self._overflow -= 1
616                finally:
617                    self._overflow_lock.release()
618
619    def do_get(self):
620        try:
621            wait = self._max_overflow > -1 and self._overflow >= self._max_overflow
622            return self._pool.get(wait, self._timeout)
623        except Queue.Empty:
624            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
625                if not wait:
626                    return self.do_get()
627                else:
628                    raise exc.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out, timeout %d" % (self.size(), self.overflow(), self._timeout))
629
630            if self._overflow_lock is not None:
631                self._overflow_lock.acquire()
632
633            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
634                if self._overflow_lock is not None:
635                    self._overflow_lock.release()
636                return self.do_get()
637
638            try:
639                con = self.create_connection()
640                self._overflow += 1
641            finally:
642                if self._overflow_lock is not None:
643                    self._overflow_lock.release()
644            return con
645
646    def dispose(self):
647        while True:
648            try:
649                conn = self._pool.get(False)
650                conn.close()
651            except Queue.Empty:
652                break
653
654        self._overflow = 0 - self.size()
655        if self._should_log_info:
656            self.log("Pool disposed. " + self.status())
657
658    def status(self):
659        tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
660        return "Pool size: %d  Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
661
662    def size(self):
663        return self._pool.maxsize
664
665    def checkedin(self):
666        return self._pool.qsize()
667
668    def overflow(self):
669        return self._overflow
670
671    def checkedout(self):
672        return self._pool.maxsize - self._pool.qsize() + self._overflow
673
674class NullPool(Pool):
675    """A Pool which does not pool connections.
676
677    Instead it literally opens and closes the underlying DB-API connection
678    per each connection open/close.
679
680    Reconnect-related functions such as ``recycle`` and connection
681    invalidation are not supported by this Pool implementation, since
682    no connections are held persistently.
683
684    """
685
686    def status(self):
687        return "NullPool"
688
689    def do_return_conn(self, conn):
690        conn.close()
691
692    def do_return_invalid(self, conn):
693        pass
694
695    def do_get(self):
696        return self.create_connection()
697
698    def recreate(self):
699        self.log("Pool recreating")
700
701        return NullPool(self._creator,
702            recycle=self._recycle,
703            echo=self._should_log_info,
704            use_threadlocal=self._use_threadlocal,
705            listeners=self.listeners)
706
707    def dispose(self):
708        pass
709
710
711class StaticPool(Pool):
712    """A Pool of exactly one connection, used for all requests.
713
714    Reconnect-related functions such as ``recycle`` and connection
715    invalidation (which is also used to support auto-reconnect) are not
716    currently supported by this Pool implementation but may be implemented
717    in a future release.
718
719    """
720
721    def __init__(self, creator, **params):
722        """
723        Construct a StaticPool.
724
725        :param creator: a callable function that returns a DB-API
726          connection object.  The function will be called with
727          parameters.
728
729        :param echo: If True, connections being pulled and retrieved
730          from the pool will be logged to the standard output, as well
731          as pool sizing information.  Echoing can also be achieved by
732          enabling logging for the "sqlalchemy.pool"
733          namespace. Defaults to False.
734
735        :param reset_on_return: If true, reset the database state of
736          connections returned to the pool.  This is typically a
737          ROLLBACK to release locks and transaction resources.
738          Disable at your own peril.  Defaults to True.
739
740        :param listeners: A list of
741          :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
742          dictionaries of callables that receive events when DB-API
743          connections are created, checked out and checked in to the
744          pool.
745
746        """
747        Pool.__init__(self, creator, **params)
748        self._conn = creator()
749        self.connection = _ConnectionRecord(self)
750
751    def status(self):
752        return "StaticPool"
753
754    def dispose(self):
755        self._conn.close()
756        self._conn = None
757
758    def recreate(self):
759        self.log("Pool recreating")
760        return self.__class__(creator=self._creator,
761                              recycle=self._recycle,
762                              use_threadlocal=self._use_threadlocal,
763                              reset_on_return=self._reset_on_return,
764                              echo=self.echo,
765                              listeners=self.listeners)
766
767    def create_connection(self):
768        return self._conn
769
770    def do_return_conn(self, conn):
771        pass
772
773    def do_return_invalid(self, conn):
774        pass
775
776    def do_get(self):
777        return self.connection
778
779
780class AssertionPool(Pool):
781    """A Pool that allows at most one checked out connection at any given time.
782
783    This will raise an exception if more than one connection is checked out
784    at a time.  Useful for debugging code that is using more connections
785    than desired.
786
787    """
788
789    ## TODO: modify this to handle an arbitrary connection count.
790
791    def __init__(self, creator, **params):
792        """
793        Construct an AssertionPool.
794
795        :param creator: a callable function that returns a DB-API
796          connection object.  The function will be called with
797          parameters.
798
799        :param recycle: If set to non -1, number of seconds between
800          connection recycling, which means upon checkout, if this
801          timeout is surpassed the connection will be closed and
802          replaced with a newly opened connection. Defaults to -1.
803
804        :param echo: If True, connections being pulled and retrieved
805          from the pool will be logged to the standard output, as well
806          as pool sizing information.  Echoing can also be achieved by
807          enabling logging for the "sqlalchemy.pool"
808          namespace. Defaults to False.
809
810        :param use_threadlocal: If set to True, repeated calls to
811          :meth:`connect` within the same application thread will be
812          guaranteed to return the same connection object, if one has
813          already been retrieved from the pool and has not been
814          returned yet.  Offers a slight performance advantage at the
815          cost of individual transactions by default.  The
816          :meth:`unique_connection` method is provided to bypass the
817          threadlocal behavior installed into :meth:`connect`.
818
819        :param reset_on_return: If true, reset the database state of
820          connections returned to the pool.  This is typically a
821          ROLLBACK to release locks and transaction resources.
822          Disable at your own peril.  Defaults to True.
823
824        :param listeners: A list of
825          :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
826          dictionaries of callables that receive events when DB-API
827          connections are created, checked out and checked in to the
828          pool.
829
830        """
831        Pool.__init__(self, creator, **params)
832        self.connection = _ConnectionRecord(self)
833        self._conn = self.connection
834
835    def status(self):
836        return "AssertionPool"
837
838    def create_connection(self):
839        raise AssertionError("Invalid")
840
841    def do_return_conn(self, conn):
842        assert conn is self._conn and self.connection is None
843        self.connection = conn
844
845    def do_return_invalid(self, conn):
846        raise AssertionError("Invalid")
847
848    def do_get(self):
849        assert self.connection is not None
850        c = self.connection
851        self.connection = None
852        return c
853
854class _DBProxy(object):
855    """Layers connection pooling behavior on top of a standard DB-API module.
856
857    Proxies a DB-API 2.0 connect() call to a connection pool keyed to the
858    specific connect parameters. Other functions and attributes are delegated
859    to the underlying DB-API module.
860    """
861
862    def __init__(self, module, poolclass=QueuePool, **params):
863        """Initializes a new proxy.
864
865        module
866          a DB-API 2.0 module
867
868        poolclass
869          a Pool class, defaulting to QueuePool
870
871        Other parameters are sent to the Pool object's constructor.
872        """
873
874        self.module = module
875        self.params = params
876        self.poolclass = poolclass
877        self.pools = {}
878        self._create_pool_mutex = threading.Lock()
879       
880    def close(self):
881        for key in self.pools.keys():
882            del self.pools[key]
883
884    def __del__(self):
885        self.close()
886
887    def __getattr__(self, key):
888        return getattr(self.module, key)
889
890    def get_pool(self, *args, **params):
891        key = self._serialize(*args, **params)
892        try:
893            return self.pools[key]
894        except KeyError:
895            self._create_pool_mutex.acquire()
896            try:
897                if key not in self.pools:
898                    pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params)
899                    self.pools[key] = pool
900                    return pool
901                else:
902                    return self.pools[key]
903            finally:
904                self._create_pool_mutex.release()
905               
906    def connect(self, *args, **params):
907        """Activate a connection to the database.
908
909        Connect to the database using this DBProxy's module and the given
910        connect arguments.  If the arguments match an existing pool, the
911        connection will be returned from the pool's current thread-local
912        connection instance, or if there is no thread-local connection
913        instance it will be checked out from the set of pooled connections.
914
915        If the pool has no available connections and allows new connections
916        to be created, a new database connection will be made.
917        """
918
919        return self.get_pool(*args, **params).connect()
920
921    def dispose(self, *args, **params):
922        """Dispose the connection pool referenced by the given connect arguments."""
923
924        key = self._serialize(*args, **params)
925        try:
926            del self.pools[key]
927        except KeyError:
928            pass
929
930    def _serialize(self, *args, **params):
931        return pickle.dumps([args, params])
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。