| 1 | # shard.py |
|---|
| 2 | # Copyright (C) the SQLAlchemy authors and contributors |
|---|
| 3 | # |
|---|
| 4 | # This module is part of SQLAlchemy and is released under |
|---|
| 5 | # the MIT License: http://www.opensource.org/licenses/mit-license.php |
|---|
| 6 | |
|---|
| 7 | """Horizontal sharding support. |
|---|
| 8 | |
|---|
| 9 | Defines a rudimental 'horizontal sharding' system which allows a Session to |
|---|
| 10 | distribute queries and persistence operations across multiple databases. |
|---|
| 11 | |
|---|
| 12 | For a usage example, see the file ``examples/sharding/attribute_shard.py`` |
|---|
| 13 | included in the source distrbution. |
|---|
| 14 | |
|---|
| 15 | """ |
|---|
| 16 | |
|---|
| 17 | import sqlalchemy.exceptions as sa_exc |
|---|
| 18 | from sqlalchemy import util |
|---|
| 19 | from sqlalchemy.orm.session import Session |
|---|
| 20 | from sqlalchemy.orm.query import Query |
|---|
| 21 | |
|---|
| 22 | __all__ = ['ShardedSession', 'ShardedQuery'] |
|---|
| 23 | |
|---|
| 24 | |
|---|
| 25 | class ShardedSession(Session): |
|---|
| 26 | def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None, **kwargs): |
|---|
| 27 | """Construct a ShardedSession. |
|---|
| 28 | |
|---|
| 29 | shard_chooser |
|---|
| 30 | A callable which, passed a Mapper, a mapped instance, and possibly a |
|---|
| 31 | SQL clause, returns a shard ID. This id may be based off of the |
|---|
| 32 | attributes present within the object, or on some round-robin |
|---|
| 33 | scheme. If the scheme is based on a selection, it should set |
|---|
| 34 | whatever state on the instance to mark it in the future as |
|---|
| 35 | participating in that shard. |
|---|
| 36 | |
|---|
| 37 | id_chooser |
|---|
| 38 | A callable, passed a query and a tuple of identity values, which |
|---|
| 39 | should return a list of shard ids where the ID might reside. The |
|---|
| 40 | databases will be queried in the order of this listing. |
|---|
| 41 | |
|---|
| 42 | query_chooser |
|---|
| 43 | For a given Query, returns the list of shard_ids where the query |
|---|
| 44 | should be issued. Results from all shards returned will be combined |
|---|
| 45 | together into a single listing. |
|---|
| 46 | |
|---|
| 47 | """ |
|---|
| 48 | super(ShardedSession, self).__init__(**kwargs) |
|---|
| 49 | self.shard_chooser = shard_chooser |
|---|
| 50 | self.id_chooser = id_chooser |
|---|
| 51 | self.query_chooser = query_chooser |
|---|
| 52 | self.__binds = {} |
|---|
| 53 | self._mapper_flush_opts = {'connection_callable':self.connection} |
|---|
| 54 | self._query_cls = ShardedQuery |
|---|
| 55 | if shards is not None: |
|---|
| 56 | for k in shards: |
|---|
| 57 | self.bind_shard(k, shards[k]) |
|---|
| 58 | |
|---|
| 59 | def connection(self, mapper=None, instance=None, shard_id=None, **kwargs): |
|---|
| 60 | if shard_id is None: |
|---|
| 61 | shard_id = self.shard_chooser(mapper, instance) |
|---|
| 62 | |
|---|
| 63 | if self.transaction is not None: |
|---|
| 64 | return self.transaction.connection(mapper, shard_id=shard_id) |
|---|
| 65 | else: |
|---|
| 66 | return self.get_bind(mapper, shard_id=shard_id, instance=instance).contextual_connect(**kwargs) |
|---|
| 67 | |
|---|
| 68 | def get_bind(self, mapper, shard_id=None, instance=None, clause=None, **kw): |
|---|
| 69 | if shard_id is None: |
|---|
| 70 | shard_id = self.shard_chooser(mapper, instance, clause=clause) |
|---|
| 71 | return self.__binds[shard_id] |
|---|
| 72 | |
|---|
| 73 | def bind_shard(self, shard_id, bind): |
|---|
| 74 | self.__binds[shard_id] = bind |
|---|
| 75 | |
|---|
| 76 | class ShardedQuery(Query): |
|---|
| 77 | def __init__(self, *args, **kwargs): |
|---|
| 78 | super(ShardedQuery, self).__init__(*args, **kwargs) |
|---|
| 79 | self.id_chooser = self.session.id_chooser |
|---|
| 80 | self.query_chooser = self.session.query_chooser |
|---|
| 81 | self._shard_id = None |
|---|
| 82 | |
|---|
| 83 | def set_shard(self, shard_id): |
|---|
| 84 | """return a new query, limited to a single shard ID. |
|---|
| 85 | |
|---|
| 86 | all subsequent operations with the returned query will |
|---|
| 87 | be against the single shard regardless of other state. |
|---|
| 88 | """ |
|---|
| 89 | |
|---|
| 90 | q = self._clone() |
|---|
| 91 | q._shard_id = shard_id |
|---|
| 92 | return q |
|---|
| 93 | |
|---|
| 94 | def _execute_and_instances(self, context): |
|---|
| 95 | if self._shard_id is not None: |
|---|
| 96 | result = self.session.connection(mapper=self._mapper_zero(), shard_id=self._shard_id).execute(context.statement, self._params) |
|---|
| 97 | return self.instances(result, context) |
|---|
| 98 | else: |
|---|
| 99 | partial = [] |
|---|
| 100 | for shard_id in self.query_chooser(self): |
|---|
| 101 | result = self.session.connection(mapper=self._mapper_zero(), shard_id=shard_id).execute(context.statement, self._params) |
|---|
| 102 | partial = partial + list(self.instances(result, context)) |
|---|
| 103 | # if some kind of in memory 'sorting' were done, this is where it would happen |
|---|
| 104 | return iter(partial) |
|---|
| 105 | |
|---|
| 106 | def get(self, ident, **kwargs): |
|---|
| 107 | if self._shard_id is not None: |
|---|
| 108 | return super(ShardedQuery, self).get(ident) |
|---|
| 109 | else: |
|---|
| 110 | ident = util.to_list(ident) |
|---|
| 111 | for shard_id in self.id_chooser(self, ident): |
|---|
| 112 | o = self.set_shard(shard_id).get(ident, **kwargs) |
|---|
| 113 | if o is not None: |
|---|
| 114 | return o |
|---|
| 115 | else: |
|---|
| 116 | return None |
|---|
| 117 | |
|---|
| 118 | def load(self, ident, **kwargs): |
|---|
| 119 | if self._shard_id is not None: |
|---|
| 120 | return super(ShardedQuery, self).load(ident) |
|---|
| 121 | else: |
|---|
| 122 | for shard_id in self.id_chooser(self, ident): |
|---|
| 123 | o = self.set_shard(shard_id).load(ident, raiseerr=False, **kwargs) |
|---|
| 124 | if o is not None: |
|---|
| 125 | return o |
|---|
| 126 | else: |
|---|
| 127 | raise sa_exc.InvalidRequestError("No instance found for identity %s" % repr(ident)) |
|---|