1 | # shard.py |
---|
2 | # Copyright (C) the SQLAlchemy authors and contributors |
---|
3 | # |
---|
4 | # This module is part of SQLAlchemy and is released under |
---|
5 | # the MIT License: http://www.opensource.org/licenses/mit-license.php |
---|
6 | |
---|
7 | """Horizontal sharding support. |
---|
8 | |
---|
9 | Defines a rudimental 'horizontal sharding' system which allows a Session to |
---|
10 | distribute queries and persistence operations across multiple databases. |
---|
11 | |
---|
12 | For a usage example, see the file ``examples/sharding/attribute_shard.py`` |
---|
13 | included in the source distrbution. |
---|
14 | |
---|
15 | """ |
---|
16 | |
---|
17 | import sqlalchemy.exceptions as sa_exc |
---|
18 | from sqlalchemy import util |
---|
19 | from sqlalchemy.orm.session import Session |
---|
20 | from sqlalchemy.orm.query import Query |
---|
21 | |
---|
22 | __all__ = ['ShardedSession', 'ShardedQuery'] |
---|
23 | |
---|
24 | |
---|
25 | class ShardedSession(Session): |
---|
26 | def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None, **kwargs): |
---|
27 | """Construct a ShardedSession. |
---|
28 | |
---|
29 | shard_chooser |
---|
30 | A callable which, passed a Mapper, a mapped instance, and possibly a |
---|
31 | SQL clause, returns a shard ID. This id may be based off of the |
---|
32 | attributes present within the object, or on some round-robin |
---|
33 | scheme. If the scheme is based on a selection, it should set |
---|
34 | whatever state on the instance to mark it in the future as |
---|
35 | participating in that shard. |
---|
36 | |
---|
37 | id_chooser |
---|
38 | A callable, passed a query and a tuple of identity values, which |
---|
39 | should return a list of shard ids where the ID might reside. The |
---|
40 | databases will be queried in the order of this listing. |
---|
41 | |
---|
42 | query_chooser |
---|
43 | For a given Query, returns the list of shard_ids where the query |
---|
44 | should be issued. Results from all shards returned will be combined |
---|
45 | together into a single listing. |
---|
46 | |
---|
47 | """ |
---|
48 | super(ShardedSession, self).__init__(**kwargs) |
---|
49 | self.shard_chooser = shard_chooser |
---|
50 | self.id_chooser = id_chooser |
---|
51 | self.query_chooser = query_chooser |
---|
52 | self.__binds = {} |
---|
53 | self._mapper_flush_opts = {'connection_callable':self.connection} |
---|
54 | self._query_cls = ShardedQuery |
---|
55 | if shards is not None: |
---|
56 | for k in shards: |
---|
57 | self.bind_shard(k, shards[k]) |
---|
58 | |
---|
59 | def connection(self, mapper=None, instance=None, shard_id=None, **kwargs): |
---|
60 | if shard_id is None: |
---|
61 | shard_id = self.shard_chooser(mapper, instance) |
---|
62 | |
---|
63 | if self.transaction is not None: |
---|
64 | return self.transaction.connection(mapper, shard_id=shard_id) |
---|
65 | else: |
---|
66 | return self.get_bind(mapper, shard_id=shard_id, instance=instance).contextual_connect(**kwargs) |
---|
67 | |
---|
68 | def get_bind(self, mapper, shard_id=None, instance=None, clause=None, **kw): |
---|
69 | if shard_id is None: |
---|
70 | shard_id = self.shard_chooser(mapper, instance, clause=clause) |
---|
71 | return self.__binds[shard_id] |
---|
72 | |
---|
73 | def bind_shard(self, shard_id, bind): |
---|
74 | self.__binds[shard_id] = bind |
---|
75 | |
---|
76 | class ShardedQuery(Query): |
---|
77 | def __init__(self, *args, **kwargs): |
---|
78 | super(ShardedQuery, self).__init__(*args, **kwargs) |
---|
79 | self.id_chooser = self.session.id_chooser |
---|
80 | self.query_chooser = self.session.query_chooser |
---|
81 | self._shard_id = None |
---|
82 | |
---|
83 | def set_shard(self, shard_id): |
---|
84 | """return a new query, limited to a single shard ID. |
---|
85 | |
---|
86 | all subsequent operations with the returned query will |
---|
87 | be against the single shard regardless of other state. |
---|
88 | """ |
---|
89 | |
---|
90 | q = self._clone() |
---|
91 | q._shard_id = shard_id |
---|
92 | return q |
---|
93 | |
---|
94 | def _execute_and_instances(self, context): |
---|
95 | if self._shard_id is not None: |
---|
96 | result = self.session.connection(mapper=self._mapper_zero(), shard_id=self._shard_id).execute(context.statement, self._params) |
---|
97 | return self.instances(result, context) |
---|
98 | else: |
---|
99 | partial = [] |
---|
100 | for shard_id in self.query_chooser(self): |
---|
101 | result = self.session.connection(mapper=self._mapper_zero(), shard_id=shard_id).execute(context.statement, self._params) |
---|
102 | partial = partial + list(self.instances(result, context)) |
---|
103 | # if some kind of in memory 'sorting' were done, this is where it would happen |
---|
104 | return iter(partial) |
---|
105 | |
---|
106 | def get(self, ident, **kwargs): |
---|
107 | if self._shard_id is not None: |
---|
108 | return super(ShardedQuery, self).get(ident) |
---|
109 | else: |
---|
110 | ident = util.to_list(ident) |
---|
111 | for shard_id in self.id_chooser(self, ident): |
---|
112 | o = self.set_shard(shard_id).get(ident, **kwargs) |
---|
113 | if o is not None: |
---|
114 | return o |
---|
115 | else: |
---|
116 | return None |
---|
117 | |
---|
118 | def load(self, ident, **kwargs): |
---|
119 | if self._shard_id is not None: |
---|
120 | return super(ShardedQuery, self).load(ident) |
---|
121 | else: |
---|
122 | for shard_id in self.id_chooser(self, ident): |
---|
123 | o = self.set_shard(shard_id).load(ident, raiseerr=False, **kwargs) |
---|
124 | if o is not None: |
---|
125 | return o |
---|
126 | else: |
---|
127 | raise sa_exc.InvalidRequestError("No instance found for identity %s" % repr(ident)) |
---|