root/galaxy-central/eggs/SQLAlchemy-0.5.6_dev_r6498-py2.6.egg/sqlalchemy/orm/shard.py

リビジョン 3, 5.0 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1# shard.py
2# Copyright (C) the SQLAlchemy authors and contributors
3#
4# This module is part of SQLAlchemy and is released under
5# the MIT License: http://www.opensource.org/licenses/mit-license.php
6
7"""Horizontal sharding support.
8
9Defines a rudimental 'horizontal sharding' system which allows a Session to
10distribute queries and persistence operations across multiple databases.
11
12For a usage example, see the file ``examples/sharding/attribute_shard.py``
13included in the source distrbution.
14
15"""
16
17import sqlalchemy.exceptions as sa_exc
18from sqlalchemy import util
19from sqlalchemy.orm.session import Session
20from sqlalchemy.orm.query import Query
21
22__all__ = ['ShardedSession', 'ShardedQuery']
23
24
25class ShardedSession(Session):
26    def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None, **kwargs):
27        """Construct a ShardedSession.
28
29        shard_chooser
30          A callable which, passed a Mapper, a mapped instance, and possibly a
31          SQL clause, returns a shard ID.  This id may be based off of the
32          attributes present within the object, or on some round-robin
33          scheme. If the scheme is based on a selection, it should set
34          whatever state on the instance to mark it in the future as
35          participating in that shard.
36
37        id_chooser
38          A callable, passed a query and a tuple of identity values, which
39          should return a list of shard ids where the ID might reside.  The
40          databases will be queried in the order of this listing.
41
42        query_chooser
43          For a given Query, returns the list of shard_ids where the query
44          should be issued.  Results from all shards returned will be combined
45          together into a single listing.
46
47        """
48        super(ShardedSession, self).__init__(**kwargs)
49        self.shard_chooser = shard_chooser
50        self.id_chooser = id_chooser
51        self.query_chooser = query_chooser
52        self.__binds = {}
53        self._mapper_flush_opts = {'connection_callable':self.connection}
54        self._query_cls = ShardedQuery
55        if shards is not None:
56            for k in shards:
57                self.bind_shard(k, shards[k])
58       
59    def connection(self, mapper=None, instance=None, shard_id=None, **kwargs):
60        if shard_id is None:
61            shard_id = self.shard_chooser(mapper, instance)
62
63        if self.transaction is not None:
64            return self.transaction.connection(mapper, shard_id=shard_id)
65        else:
66            return self.get_bind(mapper, shard_id=shard_id, instance=instance).contextual_connect(**kwargs)
67   
68    def get_bind(self, mapper, shard_id=None, instance=None, clause=None, **kw):
69        if shard_id is None:
70            shard_id = self.shard_chooser(mapper, instance, clause=clause)
71        return self.__binds[shard_id]
72
73    def bind_shard(self, shard_id, bind):
74        self.__binds[shard_id] = bind
75
76class ShardedQuery(Query):
77    def __init__(self, *args, **kwargs):
78        super(ShardedQuery, self).__init__(*args, **kwargs)
79        self.id_chooser = self.session.id_chooser
80        self.query_chooser = self.session.query_chooser
81        self._shard_id = None
82       
83    def set_shard(self, shard_id):
84        """return a new query, limited to a single shard ID.
85       
86        all subsequent operations with the returned query will
87        be against the single shard regardless of other state.
88        """
89       
90        q = self._clone()
91        q._shard_id = shard_id
92        return q
93       
94    def _execute_and_instances(self, context):
95        if self._shard_id is not None:
96            result = self.session.connection(mapper=self._mapper_zero(), shard_id=self._shard_id).execute(context.statement, self._params)
97            return self.instances(result, context)
98        else:
99            partial = []
100            for shard_id in self.query_chooser(self):
101                result = self.session.connection(mapper=self._mapper_zero(), shard_id=shard_id).execute(context.statement, self._params)
102                partial = partial + list(self.instances(result, context))
103            # if some kind of in memory 'sorting' were done, this is where it would happen
104            return iter(partial)
105
106    def get(self, ident, **kwargs):
107        if self._shard_id is not None:
108            return super(ShardedQuery, self).get(ident)
109        else:
110            ident = util.to_list(ident)
111            for shard_id in self.id_chooser(self, ident):
112                o = self.set_shard(shard_id).get(ident, **kwargs)
113                if o is not None:
114                    return o
115            else:
116                return None
117   
118    def load(self, ident, **kwargs):
119        if self._shard_id is not None:
120            return super(ShardedQuery, self).load(ident)
121        else:
122            for shard_id in self.id_chooser(self, ident):
123                o = self.set_shard(shard_id).load(ident, raiseerr=False, **kwargs)
124                if o is not None:
125                    return o
126            else:
127                raise sa_exc.InvalidRequestError("No instance found for identity %s" % repr(ident))
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。