root/galaxy-central/eggs/sqlalchemy_migrate-0.5.4-py2.6.egg/migrate/changeset/constraint.py

リビジョン 3, 7.5 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1"""
2   This module defines standalone schema constraint classes.
3"""
4import sqlalchemy
5from sqlalchemy import schema
6
7
8class ConstraintChangeset(object):
9    """Base class for Constraint classes.
10    """
11
12    def _normalize_columns(self, cols, fullname=False):
13        """Given: column objects or names; return col names and
14        (maybe) a table"""
15        colnames = []
16        table = None
17        for col in cols:
18            if isinstance(col, schema.Column):
19                if col.table is not None and table is None:
20                    table = col.table
21                if fullname:
22                    col = '.'.join((col.table.name, col.name))
23                else:
24                    col = col.name
25            colnames.append(col)
26        return colnames, table
27
28    def create(self, engine=None):
29        """Create the constraint in the database.
30
31        :param engine: the database engine to use. If this is
32          :keyword:`None` the instance's engine will be used
33        :type engine: :class:`sqlalchemy.engine.base.Engine`
34        """
35        if engine is None:
36            engine = self.engine
37        engine.create(self)
38
39    def drop(self, engine=None):
40        """Drop the constraint from the database.
41
42        :param engine: the database engine to use. If this is
43          :keyword:`None` the instance's engine will be used
44        :type engine: :class:`sqlalchemy.engine.base.Engine`
45        """
46        if engine is None:
47            engine = self.engine
48        engine.drop(self)
49
50    def _derived_metadata(self):
51        return self.table._derived_metadata()
52
53    def accept_schema_visitor(self, visitor, *p, **k):
54        """
55        :raises: :exc:`NotImplementedError` if this method is not \
56overridden by a subclass
57        """
58        raise NotImplementedError()
59
60    def _accept_schema_visitor(self, visitor, func, *p, **k):
61        """Call the visitor only if it defines the given function"""
62        try:
63            func = getattr(visitor, func)
64        except AttributeError:
65            return
66        return func(self)
67
68    def autoname(self):
69        """Automatically generate a name for the constraint instance.
70
71        Subclasses must implement this method.
72
73        :raises: :exc:`NotImplementedError` if this method is not \
74overridden by a subclass
75        """
76        raise NotImplementedError()
77
78
79def _engine_run_visitor(engine, visitorcallable, element, **kwargs):
80    conn = engine.connect()
81    try:
82        element.accept_schema_visitor(visitorcallable(conn))
83    finally:
84        conn.close()
85
86
87class PrimaryKeyConstraint(ConstraintChangeset, schema.PrimaryKeyConstraint):
88    """Primary key constraint class."""
89
90    def __init__(self, *cols, **kwargs):
91        colnames, table = self._normalize_columns(cols)
92        table = kwargs.pop('table', table)
93        super(PrimaryKeyConstraint, self).__init__(*colnames, **kwargs)
94        if table is not None:
95            self._set_parent(table)
96
97    def _set_parent(self, table):
98        self.table = table
99        return super(ConstraintChangeset, self)._set_parent(table)
100
101    def create(self, *args, **kwargs):
102        from migrate.changeset.databases.visitor import get_engine_visitor
103        visitorcallable = get_engine_visitor(self.table.bind,
104                                             'constraintgenerator')
105        _engine_run_visitor(self.table.bind, visitorcallable, self)
106
107    def autoname(self):
108        """Mimic the database's automatic constraint names"""
109        ret = "%(table)s_pkey"%dict(
110            table=self.table.name,
111        )
112        return ret
113
114    def drop(self, *args, **kwargs):
115        from migrate.changeset.databases.visitor import get_engine_visitor
116        visitorcallable = get_engine_visitor(self.table.bind,
117                                             'constraintdropper')
118        _engine_run_visitor(self.table.bind, visitorcallable, self)
119        self.columns.clear()
120        return self
121
122    def accept_schema_visitor(self, visitor, *p, **k):
123        func = 'visit_migrate_primary_key_constraint'
124        return self._accept_schema_visitor(visitor, func, *p, **k)
125
126
127class ForeignKeyConstraint(ConstraintChangeset, schema.ForeignKeyConstraint):
128    """Foreign key constraint class."""
129
130    def __init__(self, columns, refcolumns, *p, **k):
131        colnames, table = self._normalize_columns(columns)
132        table = k.pop('table', table)
133        refcolnames, reftable = self._normalize_columns(refcolumns,
134                                                        fullname=True)
135        super(ForeignKeyConstraint, self).__init__(colnames, refcolnames, *p,
136                                                   **k)
137        if table is not None:
138            self._set_parent(table)
139
140    def _get_referenced(self):
141        return [e.column for e in self.elements]
142    referenced = property(_get_referenced)
143
144    def _get_reftable(self):
145        return self.referenced[0].table
146    reftable = property(_get_reftable)
147
148    def autoname(self):
149        """Mimic the database's automatic constraint names"""
150        ret = "%(table)s_%(reftable)s_fkey"%dict(
151            table=self.table.name,
152            reftable=self.reftable.name,
153        )
154        return ret
155
156    def create(self, *args, **kwargs):
157        from migrate.changeset.databases.visitor import get_engine_visitor
158        visitorcallable = get_engine_visitor(self.table.bind,
159                                             'constraintgenerator')
160        _engine_run_visitor(self.table.bind, visitorcallable, self)
161        return self
162
163    def drop(self, *args, **kwargs):
164        from migrate.changeset.databases.visitor import get_engine_visitor
165        visitorcallable = get_engine_visitor(self.table.bind,
166                                             'constraintdropper')
167        _engine_run_visitor(self.table.bind, visitorcallable, self)
168        self.columns.clear()
169        return self
170
171    def accept_schema_visitor(self, visitor, *p, **k):
172        func = 'visit_migrate_foreign_key_constraint'
173        return self._accept_schema_visitor(visitor, func, *p, **k)
174
175
176class CheckConstraint(ConstraintChangeset, schema.CheckConstraint):
177    """Check constraint class."""
178
179    def __init__(self, sqltext, *args, **kwargs):
180        cols = kwargs.pop('columns')
181        colnames, table = self._normalize_columns(cols)
182        table = kwargs.pop('table', table)
183        ConstraintChangeset.__init__(self, *args, **kwargs)
184        schema.CheckConstraint.__init__(self, sqltext, *args, **kwargs)
185        if table is not None:
186            self._set_parent(table)
187        self.colnames = colnames
188
189    def _set_parent(self, table):
190        self.table = table
191        return super(ConstraintChangeset, self)._set_parent(table)
192
193    def create(self):
194        from migrate.changeset.databases.visitor import get_engine_visitor
195        visitorcallable = get_engine_visitor(self.table.bind,
196                                             'constraintgenerator')
197        _engine_run_visitor(self.table.bind, visitorcallable, self)
198
199    def drop(self):
200        from migrate.changeset.databases.visitor import get_engine_visitor
201        visitorcallable = get_engine_visitor(self.table.bind,
202                                             'constraintdropper')
203        _engine_run_visitor(self.table.bind, visitorcallable, self)
204        self.columns.clear()
205        return self
206
207    def autoname(self):
208        return "%(table)s_%(cols)s_check" % \
209            {"table": self.table.name, "cols": "_".join(self.colnames)}
210
211    def accept_schema_visitor(self, visitor, *args, **kwargs):
212        func = 'visit_migrate_check_constraint'
213        return self._accept_schema_visitor(visitor, func, *args, **kwargs)
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。