root/galaxy-central/eggs/SQLAlchemy-0.5.6_dev_r6498-py2.6.egg/sqlalchemy/engine/strategies.py @ 3

リビジョン 3, 6.8 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1"""Strategies for creating new instances of Engine types.
2
3These are semi-private implementation classes which provide the
4underlying behavior for the "strategy" keyword argument available on
5:func:`~sqlalchemy.engine.create_engine`.  Current available options are
6``plain``, ``threadlocal``, and ``mock``.
7
8New strategies can be added via new ``EngineStrategy`` classes.
9
10"""
11from operator import attrgetter
12
13from sqlalchemy.engine import base, threadlocal, url
14from sqlalchemy import util, exc
15from sqlalchemy import pool as poollib
16
17
18strategies = {}
19
20class EngineStrategy(object):
21    """An adaptor that processes input arguements and produces an Engine.
22
23    Provides a ``create`` method that receives input arguments and
24    produces an instance of base.Engine or a subclass.
25    """
26
27    def __init__(self, name):
28        """Construct a new EngineStrategy object.
29
30        Sets it in the list of available strategies under this name.
31        """
32
33        self.name = name
34        strategies[self.name] = self
35
36    def create(self, *args, **kwargs):
37        """Given arguments, returns a new Engine instance."""
38
39        raise NotImplementedError()
40
41class DefaultEngineStrategy(EngineStrategy):
42    """Base class for built-in stratgies."""
43
44    def create(self, name_or_url, **kwargs):
45        # create url.URL object
46        u = url.make_url(name_or_url)
47
48        dialect_cls = u.get_dialect()
49
50        dialect_args = {}
51        # consume dialect arguments from kwargs
52        for k in util.get_cls_kwargs(dialect_cls):
53            if k in kwargs:
54                dialect_args[k] = kwargs.pop(k)
55
56        dbapi = kwargs.pop('module', None)
57        if dbapi is None:
58            dbapi_args = {}
59            for k in util.get_func_kwargs(dialect_cls.dbapi):
60                if k in kwargs:
61                    dbapi_args[k] = kwargs.pop(k)
62            dbapi = dialect_cls.dbapi(**dbapi_args)
63
64        dialect_args['dbapi'] = dbapi
65
66        # create dialect
67        dialect = dialect_cls(**dialect_args)
68
69        # assemble connection arguments
70        (cargs, cparams) = dialect.create_connect_args(u)
71        cparams.update(kwargs.pop('connect_args', {}))
72
73        # look for existing pool or create
74        pool = kwargs.pop('pool', None)
75        if pool is None:
76            def connect():
77                try:
78                    return dbapi.connect(*cargs, **cparams)
79                except Exception, e:
80                    raise exc.DBAPIError.instance(None, None, e)
81            creator = kwargs.pop('creator', connect)
82
83            poolclass = (kwargs.pop('poolclass', None) or
84                         getattr(dialect_cls, 'poolclass', poollib.QueuePool))
85            pool_args = {}
86
87            # consume pool arguments from kwargs, translating a few of
88            # the arguments
89            translate = {'echo': 'echo_pool',
90                         'timeout': 'pool_timeout',
91                         'recycle': 'pool_recycle',
92                         'use_threadlocal':'pool_threadlocal'}
93            for k in util.get_cls_kwargs(poolclass):
94                tk = translate.get(k, k)
95                if tk in kwargs:
96                    pool_args[k] = kwargs.pop(tk)
97            pool_args.setdefault('use_threadlocal', self.pool_threadlocal())
98            pool = poolclass(creator, **pool_args)
99        else:
100            if isinstance(pool, poollib._DBProxy):
101                pool = pool.get_pool(*cargs, **cparams)
102            else:
103                pool = pool
104
105        # create engine.
106        engineclass = self.get_engine_cls()
107        engine_args = {}
108        for k in util.get_cls_kwargs(engineclass):
109            if k in kwargs:
110                engine_args[k] = kwargs.pop(k)
111
112        # all kwargs should be consumed
113        if kwargs:
114            raise TypeError(
115                "Invalid argument(s) %s sent to create_engine(), "
116                "using configuration %s/%s/%s.  Please check that the "
117                "keyword arguments are appropriate for this combination "
118                "of components." % (','.join("'%s'" % k for k in kwargs),
119                                    dialect.__class__.__name__,
120                                    pool.__class__.__name__,
121                                    engineclass.__name__))
122        return engineclass(pool, dialect, u, **engine_args)
123
124    def pool_threadlocal(self):
125        raise NotImplementedError()
126
127    def get_engine_cls(self):
128        raise NotImplementedError()
129
130class PlainEngineStrategy(DefaultEngineStrategy):
131    """Strategy for configuring a regular Engine."""
132
133    def __init__(self):
134        DefaultEngineStrategy.__init__(self, 'plain')
135
136    def pool_threadlocal(self):
137        return False
138
139    def get_engine_cls(self):
140        return base.Engine
141
142PlainEngineStrategy()
143
144class ThreadLocalEngineStrategy(DefaultEngineStrategy):
145    """Strategy for configuring an Engine with thredlocal behavior."""
146
147    def __init__(self):
148        DefaultEngineStrategy.__init__(self, 'threadlocal')
149
150    def pool_threadlocal(self):
151        return True
152
153    def get_engine_cls(self):
154        return threadlocal.TLEngine
155
156ThreadLocalEngineStrategy()
157
158
159class MockEngineStrategy(EngineStrategy):
160    """Strategy for configuring an Engine-like object with mocked execution.
161
162    Produces a single mock Connectable object which dispatches
163    statement execution to a passed-in function.
164    """
165
166    def __init__(self):
167        EngineStrategy.__init__(self, 'mock')
168
169    def create(self, name_or_url, executor, **kwargs):
170        # create url.URL object
171        u = url.make_url(name_or_url)
172
173        dialect_cls = u.get_dialect()
174
175        dialect_args = {}
176        # consume dialect arguments from kwargs
177        for k in util.get_cls_kwargs(dialect_cls):
178            if k in kwargs:
179                dialect_args[k] = kwargs.pop(k)
180
181        # create dialect
182        dialect = dialect_cls(**dialect_args)
183
184        return MockEngineStrategy.MockConnection(dialect, executor)
185
186    class MockConnection(base.Connectable):
187        def __init__(self, dialect, execute):
188            self._dialect = dialect
189            self.execute = execute
190
191        engine = property(lambda s: s)
192        dialect = property(attrgetter('_dialect'))
193        name = property(lambda s: s._dialect.name)
194
195        def contextual_connect(self, **kwargs):
196            return self
197
198        def compiler(self, statement, parameters, **kwargs):
199            return self._dialect.compiler(
200                statement, parameters, engine=self, **kwargs)
201
202        def create(self, entity, **kwargs):
203            kwargs['checkfirst'] = False
204            self.dialect.schemagenerator(self.dialect, self, **kwargs).traverse(entity)
205
206        def drop(self, entity, **kwargs):
207            kwargs['checkfirst'] = False
208            self.dialect.schemadropper(self.dialect, self, **kwargs).traverse(entity)
209
210        def execute(self, object, *multiparams, **params):
211            raise NotImplementedError()
212
213MockEngineStrategy()
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。