1 | """ |
---|
2 | This module defines standalone schema constraint classes. |
---|
3 | """ |
---|
4 | import sqlalchemy |
---|
5 | from sqlalchemy import schema |
---|
6 | |
---|
7 | |
---|
8 | class ConstraintChangeset(object): |
---|
9 | """Base class for Constraint classes. |
---|
10 | """ |
---|
11 | |
---|
12 | def _normalize_columns(self, cols, fullname=False): |
---|
13 | """Given: column objects or names; return col names and |
---|
14 | (maybe) a table""" |
---|
15 | colnames = [] |
---|
16 | table = None |
---|
17 | for col in cols: |
---|
18 | if isinstance(col, schema.Column): |
---|
19 | if col.table is not None and table is None: |
---|
20 | table = col.table |
---|
21 | if fullname: |
---|
22 | col = '.'.join((col.table.name, col.name)) |
---|
23 | else: |
---|
24 | col = col.name |
---|
25 | colnames.append(col) |
---|
26 | return colnames, table |
---|
27 | |
---|
28 | def create(self, engine=None): |
---|
29 | """Create the constraint in the database. |
---|
30 | |
---|
31 | :param engine: the database engine to use. If this is |
---|
32 | :keyword:`None` the instance's engine will be used |
---|
33 | :type engine: :class:`sqlalchemy.engine.base.Engine` |
---|
34 | """ |
---|
35 | if engine is None: |
---|
36 | engine = self.engine |
---|
37 | engine.create(self) |
---|
38 | |
---|
39 | def drop(self, engine=None): |
---|
40 | """Drop the constraint from the database. |
---|
41 | |
---|
42 | :param engine: the database engine to use. If this is |
---|
43 | :keyword:`None` the instance's engine will be used |
---|
44 | :type engine: :class:`sqlalchemy.engine.base.Engine` |
---|
45 | """ |
---|
46 | if engine is None: |
---|
47 | engine = self.engine |
---|
48 | engine.drop(self) |
---|
49 | |
---|
50 | def _derived_metadata(self): |
---|
51 | return self.table._derived_metadata() |
---|
52 | |
---|
53 | def accept_schema_visitor(self, visitor, *p, **k): |
---|
54 | """ |
---|
55 | :raises: :exc:`NotImplementedError` if this method is not \ |
---|
56 | overridden by a subclass |
---|
57 | """ |
---|
58 | raise NotImplementedError() |
---|
59 | |
---|
60 | def _accept_schema_visitor(self, visitor, func, *p, **k): |
---|
61 | """Call the visitor only if it defines the given function""" |
---|
62 | try: |
---|
63 | func = getattr(visitor, func) |
---|
64 | except AttributeError: |
---|
65 | return |
---|
66 | return func(self) |
---|
67 | |
---|
68 | def autoname(self): |
---|
69 | """Automatically generate a name for the constraint instance. |
---|
70 | |
---|
71 | Subclasses must implement this method. |
---|
72 | |
---|
73 | :raises: :exc:`NotImplementedError` if this method is not \ |
---|
74 | overridden by a subclass |
---|
75 | """ |
---|
76 | raise NotImplementedError() |
---|
77 | |
---|
78 | |
---|
79 | def _engine_run_visitor(engine, visitorcallable, element, **kwargs): |
---|
80 | conn = engine.connect() |
---|
81 | try: |
---|
82 | element.accept_schema_visitor(visitorcallable(conn)) |
---|
83 | finally: |
---|
84 | conn.close() |
---|
85 | |
---|
86 | |
---|
87 | class PrimaryKeyConstraint(ConstraintChangeset, schema.PrimaryKeyConstraint): |
---|
88 | """Primary key constraint class.""" |
---|
89 | |
---|
90 | def __init__(self, *cols, **kwargs): |
---|
91 | colnames, table = self._normalize_columns(cols) |
---|
92 | table = kwargs.pop('table', table) |
---|
93 | super(PrimaryKeyConstraint, self).__init__(*colnames, **kwargs) |
---|
94 | if table is not None: |
---|
95 | self._set_parent(table) |
---|
96 | |
---|
97 | def _set_parent(self, table): |
---|
98 | self.table = table |
---|
99 | return super(ConstraintChangeset, self)._set_parent(table) |
---|
100 | |
---|
101 | def create(self, *args, **kwargs): |
---|
102 | from migrate.changeset.databases.visitor import get_engine_visitor |
---|
103 | visitorcallable = get_engine_visitor(self.table.bind, |
---|
104 | 'constraintgenerator') |
---|
105 | _engine_run_visitor(self.table.bind, visitorcallable, self) |
---|
106 | |
---|
107 | def autoname(self): |
---|
108 | """Mimic the database's automatic constraint names""" |
---|
109 | ret = "%(table)s_pkey"%dict( |
---|
110 | table=self.table.name, |
---|
111 | ) |
---|
112 | return ret |
---|
113 | |
---|
114 | def drop(self, *args, **kwargs): |
---|
115 | from migrate.changeset.databases.visitor import get_engine_visitor |
---|
116 | visitorcallable = get_engine_visitor(self.table.bind, |
---|
117 | 'constraintdropper') |
---|
118 | _engine_run_visitor(self.table.bind, visitorcallable, self) |
---|
119 | self.columns.clear() |
---|
120 | return self |
---|
121 | |
---|
122 | def accept_schema_visitor(self, visitor, *p, **k): |
---|
123 | func = 'visit_migrate_primary_key_constraint' |
---|
124 | return self._accept_schema_visitor(visitor, func, *p, **k) |
---|
125 | |
---|
126 | |
---|
127 | class ForeignKeyConstraint(ConstraintChangeset, schema.ForeignKeyConstraint): |
---|
128 | """Foreign key constraint class.""" |
---|
129 | |
---|
130 | def __init__(self, columns, refcolumns, *p, **k): |
---|
131 | colnames, table = self._normalize_columns(columns) |
---|
132 | table = k.pop('table', table) |
---|
133 | refcolnames, reftable = self._normalize_columns(refcolumns, |
---|
134 | fullname=True) |
---|
135 | super(ForeignKeyConstraint, self).__init__(colnames, refcolnames, *p, |
---|
136 | **k) |
---|
137 | if table is not None: |
---|
138 | self._set_parent(table) |
---|
139 | |
---|
140 | def _get_referenced(self): |
---|
141 | return [e.column for e in self.elements] |
---|
142 | referenced = property(_get_referenced) |
---|
143 | |
---|
144 | def _get_reftable(self): |
---|
145 | return self.referenced[0].table |
---|
146 | reftable = property(_get_reftable) |
---|
147 | |
---|
148 | def autoname(self): |
---|
149 | """Mimic the database's automatic constraint names""" |
---|
150 | ret = "%(table)s_%(reftable)s_fkey"%dict( |
---|
151 | table=self.table.name, |
---|
152 | reftable=self.reftable.name, |
---|
153 | ) |
---|
154 | return ret |
---|
155 | |
---|
156 | def create(self, *args, **kwargs): |
---|
157 | from migrate.changeset.databases.visitor import get_engine_visitor |
---|
158 | visitorcallable = get_engine_visitor(self.table.bind, |
---|
159 | 'constraintgenerator') |
---|
160 | _engine_run_visitor(self.table.bind, visitorcallable, self) |
---|
161 | return self |
---|
162 | |
---|
163 | def drop(self, *args, **kwargs): |
---|
164 | from migrate.changeset.databases.visitor import get_engine_visitor |
---|
165 | visitorcallable = get_engine_visitor(self.table.bind, |
---|
166 | 'constraintdropper') |
---|
167 | _engine_run_visitor(self.table.bind, visitorcallable, self) |
---|
168 | self.columns.clear() |
---|
169 | return self |
---|
170 | |
---|
171 | def accept_schema_visitor(self, visitor, *p, **k): |
---|
172 | func = 'visit_migrate_foreign_key_constraint' |
---|
173 | return self._accept_schema_visitor(visitor, func, *p, **k) |
---|
174 | |
---|
175 | |
---|
176 | class CheckConstraint(ConstraintChangeset, schema.CheckConstraint): |
---|
177 | """Check constraint class.""" |
---|
178 | |
---|
179 | def __init__(self, sqltext, *args, **kwargs): |
---|
180 | cols = kwargs.pop('columns') |
---|
181 | colnames, table = self._normalize_columns(cols) |
---|
182 | table = kwargs.pop('table', table) |
---|
183 | ConstraintChangeset.__init__(self, *args, **kwargs) |
---|
184 | schema.CheckConstraint.__init__(self, sqltext, *args, **kwargs) |
---|
185 | if table is not None: |
---|
186 | self._set_parent(table) |
---|
187 | self.colnames = colnames |
---|
188 | |
---|
189 | def _set_parent(self, table): |
---|
190 | self.table = table |
---|
191 | return super(ConstraintChangeset, self)._set_parent(table) |
---|
192 | |
---|
193 | def create(self): |
---|
194 | from migrate.changeset.databases.visitor import get_engine_visitor |
---|
195 | visitorcallable = get_engine_visitor(self.table.bind, |
---|
196 | 'constraintgenerator') |
---|
197 | _engine_run_visitor(self.table.bind, visitorcallable, self) |
---|
198 | |
---|
199 | def drop(self): |
---|
200 | from migrate.changeset.databases.visitor import get_engine_visitor |
---|
201 | visitorcallable = get_engine_visitor(self.table.bind, |
---|
202 | 'constraintdropper') |
---|
203 | _engine_run_visitor(self.table.bind, visitorcallable, self) |
---|
204 | self.columns.clear() |
---|
205 | return self |
---|
206 | |
---|
207 | def autoname(self): |
---|
208 | return "%(table)s_%(cols)s_check" % \ |
---|
209 | {"table": self.table.name, "cols": "_".join(self.colnames)} |
---|
210 | |
---|
211 | def accept_schema_visitor(self, visitor, *args, **kwargs): |
---|
212 | func = 'visit_migrate_check_constraint' |
---|
213 | return self._accept_schema_visitor(visitor, func, *args, **kwargs) |
---|