root/galaxy-central/eggs/SQLAlchemy-0.5.6_dev_r6498-py2.6.egg/sqlalchemy/test/testing.py

リビジョン 3, 24.1 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1"""TestCase and TestSuite artifacts and testing decorators."""
2
3import itertools
4import operator
5import re
6import sys
7import types
8import warnings
9from cStringIO import StringIO
10
11from sqlalchemy.test import config, assertsql
12from sqlalchemy.util import function_named
13
14from sqlalchemy import exc as sa_exc, util, types as sqltypes, schema
15
16_ops = { '<': operator.lt,
17         '>': operator.gt,
18         '==': operator.eq,
19         '!=': operator.ne,
20         '<=': operator.le,
21         '>=': operator.ge,
22         'in': operator.contains,
23         'between': lambda val, pair: val >= pair[0] and val <= pair[1],
24         }
25
26# sugar ('testing.db'); set here by config() at runtime
27db = None
28
29# more sugar, installed by __init__
30requires = None
31
32def fails_if(callable_):
33    """Mark a test as expected to fail if callable_ returns True.
34
35    If the callable returns false, the test is run and reported as normal.
36    However if the callable returns true, the test is expected to fail and the
37    unit test logic is inverted: if the test fails, a success is reported.  If
38    the test succeeds, a failure is reported.
39    """
40
41    docstring = getattr(callable_, '__doc__', None) or callable_.__name__
42    description = docstring.split('\n')[0]
43
44    def decorate(fn):
45        fn_name = fn.__name__
46        def maybe(*args, **kw):
47            if not callable_():
48                return fn(*args, **kw)
49            else:
50                try:
51                    fn(*args, **kw)
52                except Exception, ex:
53                    print ("'%s' failed as expected (condition: %s): %s " % (
54                        fn_name, description, str(ex)))
55                    return True
56                else:
57                    raise AssertionError(
58                        "Unexpected success for '%s' (condition: %s)" %
59                        (fn_name, description))
60        return function_named(maybe, fn_name)
61    return decorate
62
63
64def future(fn):
65    """Mark a test as expected to unconditionally fail.
66
67    Takes no arguments, omit parens when using as a decorator.
68    """
69
70    fn_name = fn.__name__
71    def decorated(*args, **kw):
72        try:
73            fn(*args, **kw)
74        except Exception, ex:
75            print ("Future test '%s' failed as expected: %s " % (
76                fn_name, str(ex)))
77            return True
78        else:
79            raise AssertionError(
80                "Unexpected success for future test '%s'" % fn_name)
81    return function_named(decorated, fn_name)
82
83def fails_on(dbs, reason):
84    """Mark a test as expected to fail on the specified database
85    implementation.
86
87    Unlike ``crashes``, tests marked as ``fails_on`` will be run
88    for the named databases.  The test is expected to fail and the unit test
89    logic is inverted: if the test fails, a success is reported.  If the test
90    succeeds, a failure is reported.
91    """
92
93    def decorate(fn):
94        fn_name = fn.__name__
95        def maybe(*args, **kw):
96            if config.db.name != dbs:
97                return fn(*args, **kw)
98            else:
99                try:
100                    fn(*args, **kw)
101                except Exception, ex:
102                    print ("'%s' failed as expected on DB implementation "
103                           "'%s': %s" % (
104                        fn_name, config.db.name, reason))
105                    return True
106                else:
107                    raise AssertionError(
108                        "Unexpected success for '%s' on DB implementation '%s'" %
109                        (fn_name, config.db.name))
110        return function_named(maybe, fn_name)
111    return decorate
112
113def fails_on_everything_except(*dbs):
114    """Mark a test as expected to fail on most database implementations.
115
116    Like ``fails_on``, except failure is the expected outcome on all
117    databases except those listed.
118    """
119
120    def decorate(fn):
121        fn_name = fn.__name__
122        def maybe(*args, **kw):
123            if config.db.name in dbs:
124                return fn(*args, **kw)
125            else:
126                try:
127                    fn(*args, **kw)
128                except Exception, ex:
129                    print ("'%s' failed as expected on DB implementation "
130                           "'%s': %s" % (
131                        fn_name, config.db.name, str(ex)))
132                    return True
133                else:
134                    raise AssertionError(
135                        "Unexpected success for '%s' on DB implementation '%s'" %
136                        (fn_name, config.db.name))
137        return function_named(maybe, fn_name)
138    return decorate
139
140def crashes(db, reason):
141    """Mark a test as unsupported by a database implementation.
142
143    ``crashes`` tests will be skipped unconditionally.  Use for feature tests
144    that cause deadlocks or other fatal problems.
145
146    """
147    carp = _should_carp_about_exclusion(reason)
148    def decorate(fn):
149        fn_name = fn.__name__
150        def maybe(*args, **kw):
151            if config.db.name == db:
152                msg = "'%s' unsupported on DB implementation '%s': %s" % (
153                    fn_name, config.db.name, reason)
154                print msg
155                if carp:
156                    print >> sys.stderr, msg
157                return True
158            else:
159                return fn(*args, **kw)
160        return function_named(maybe, fn_name)
161    return decorate
162
163def _block_unconditionally(db, reason):
164    """Mark a test as unsupported by a database implementation.
165
166    Will never run the test against any version of the given database, ever,
167    no matter what.  Use when your assumptions are infallible; past, present
168    and future.
169
170    """
171    carp = _should_carp_about_exclusion(reason)
172    def decorate(fn):
173        fn_name = fn.__name__
174        def maybe(*args, **kw):
175            if config.db.name == db:
176                msg = "'%s' unsupported on DB implementation '%s': %s" % (
177                    fn_name, config.db.name, reason)
178                print msg
179                if carp:
180                    print >> sys.stderr, msg
181                return True
182            else:
183                return fn(*args, **kw)
184        return function_named(maybe, fn_name)
185    return decorate
186
187
188def exclude(db, op, spec, reason):
189    """Mark a test as unsupported by specific database server versions.
190
191    Stackable, both with other excludes and other decorators. Examples::
192
193      # Not supported by mydb versions less than 1, 0
194      @exclude('mydb', '<', (1,0))
195      # Other operators work too
196      @exclude('bigdb', '==', (9,0,9))
197      @exclude('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
198
199    """
200    carp = _should_carp_about_exclusion(reason)
201    def decorate(fn):
202        fn_name = fn.__name__
203        def maybe(*args, **kw):
204            if _is_excluded(db, op, spec):
205                msg = "'%s' unsupported on DB %s version '%s': %s" % (
206                    fn_name, config.db.name, _server_version(), reason)
207                print msg
208                if carp:
209                    print >> sys.stderr, msg
210                return True
211            else:
212                return fn(*args, **kw)
213        return function_named(maybe, fn_name)
214    return decorate
215
216def _should_carp_about_exclusion(reason):
217    """Guard against forgotten exclusions."""
218    assert reason
219    for _ in ('todo', 'fixme', 'xxx'):
220        if _ in reason.lower():
221            return True
222    else:
223        if len(reason) < 4:
224            return True
225
226def _is_excluded(db, op, spec):
227    """Return True if the configured db matches an exclusion specification.
228
229    db:
230      A dialect name
231    op:
232      An operator or stringified operator, such as '=='
233    spec:
234      A value that will be compared to the dialect's server_version_info
235      using the supplied operator.
236
237    Examples::
238      # Not supported by mydb versions less than 1, 0
239      _is_excluded('mydb', '<', (1,0))
240      # Other operators work too
241      _is_excluded('bigdb', '==', (9,0,9))
242      _is_excluded('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
243    """
244
245    if config.db.name != db:
246        return False
247
248    version = _server_version()
249
250    oper = hasattr(op, '__call__') and op or _ops[op]
251    return oper(version, spec)
252
253def _server_version(bind=None):
254    """Return a server_version_info tuple."""
255
256    if bind is None:
257        bind = config.db
258    return bind.dialect.server_version_info(bind.contextual_connect())
259
260def skip_if(predicate, reason=None):
261    """Skip a test if predicate is true."""
262    reason = reason or predicate.__name__
263    def decorate(fn):
264        fn_name = fn.__name__
265        def maybe(*args, **kw):
266            if predicate():
267                msg = "'%s' skipped on DB %s version '%s': %s" % (
268                    fn_name, config.db.name, _server_version(), reason)
269                print msg
270                return True
271            else:
272                return fn(*args, **kw)
273        return function_named(maybe, fn_name)
274    return decorate
275
276def emits_warning(*messages):
277    """Mark a test as emitting a warning.
278
279    With no arguments, squelches all SAWarning failures.  Or pass one or more
280    strings; these will be matched to the root of the warning description by
281    warnings.filterwarnings().
282    """
283
284    # TODO: it would be nice to assert that a named warning was
285    # emitted. should work with some monkeypatching of warnings,
286    # and may work on non-CPython if they keep to the spirit of
287    # warnings.showwarning's docstring.
288    # - update: jython looks ok, it uses cpython's module
289    def decorate(fn):
290        def safe(*args, **kw):
291            # todo: should probably be strict about this, too
292            filters = [dict(action='ignore',
293                            category=sa_exc.SAPendingDeprecationWarning)]
294            if not messages:
295                filters.append(dict(action='ignore',
296                                     category=sa_exc.SAWarning))
297            else:
298                filters.extend(dict(action='ignore',
299                                     message=message,
300                                     category=sa_exc.SAWarning)
301                                for message in messages)
302            for f in filters:
303                warnings.filterwarnings(**f)
304            try:
305                return fn(*args, **kw)
306            finally:
307                resetwarnings()
308        return function_named(safe, fn.__name__)
309    return decorate
310
311def emits_warning_on(db, *warnings):
312    """Mark a test as emitting a warning on a specific dialect.
313
314    With no arguments, squelches all SAWarning failures.  Or pass one or more
315    strings; these will be matched to the root of the warning description by
316    warnings.filterwarnings().
317    """
318    def decorate(fn):
319        def maybe(*args, **kw):
320            if isinstance(db, basestring):
321                if config.db.name != db:
322                    return fn(*args, **kw)
323                else:
324                    wrapped = emits_warning(*warnings)(fn)
325                    return wrapped(*args, **kw)
326            else:
327                if not _is_excluded(*db):
328                    return fn(*args, **kw)
329                else:
330                    wrapped = emits_warning(*warnings)(fn)
331                    return wrapped(*args, **kw)
332        return function_named(maybe, fn.__name__)
333    return decorate
334
335def uses_deprecated(*messages):
336    """Mark a test as immune from fatal deprecation warnings.
337
338    With no arguments, squelches all SADeprecationWarning failures.
339    Or pass one or more strings; these will be matched to the root
340    of the warning description by warnings.filterwarnings().
341
342    As a special case, you may pass a function name prefixed with //
343    and it will be re-written as needed to match the standard warning
344    verbiage emitted by the sqlalchemy.util.deprecated decorator.
345    """
346
347    def decorate(fn):
348        def safe(*args, **kw):
349            # todo: should probably be strict about this, too
350            filters = [dict(action='ignore',
351                            category=sa_exc.SAPendingDeprecationWarning)]
352            if not messages:
353                filters.append(dict(action='ignore',
354                                    category=sa_exc.SADeprecationWarning))
355            else:
356                filters.extend(
357                    [dict(action='ignore',
358                          message=message,
359                          category=sa_exc.SADeprecationWarning)
360                     for message in
361                     [ (m.startswith('//') and
362                        ('Call to deprecated function ' + m[2:]) or m)
363                       for m in messages] ])
364
365            for f in filters:
366                warnings.filterwarnings(**f)
367            try:
368                return fn(*args, **kw)
369            finally:
370                resetwarnings()
371        return function_named(safe, fn.__name__)
372    return decorate
373
374def resetwarnings():
375    """Reset warning behavior to testing defaults."""
376
377    warnings.filterwarnings('ignore',
378                            category=sa_exc.SAPendingDeprecationWarning)
379    warnings.filterwarnings('error', category=sa_exc.SADeprecationWarning)
380    warnings.filterwarnings('error', category=sa_exc.SAWarning)
381
382#    warnings.simplefilter('error')
383
384    if sys.version_info < (2, 4):
385        warnings.filterwarnings('ignore', category=FutureWarning)
386
387
388def against(*queries):
389    """Boolean predicate, compares to testing database configuration.
390
391    Given one or more dialect names, returns True if one is the configured
392    database engine.
393
394    Also supports comparison to database version when provided with one or
395    more 3-tuples of dialect name, operator, and version specification::
396
397      testing.against('mysql', 'postgres')
398      testing.against(('mysql', '>=', (5, 0, 0))
399    """
400
401    for query in queries:
402        if isinstance(query, basestring):
403            if config.db.name == query:
404                return True
405        else:
406            name, op, spec = query
407            if config.db.name != name:
408                continue
409
410            have = config.db.dialect.server_version_info(
411                config.db.contextual_connect())
412
413            oper = hasattr(op, '__call__') and op or _ops[op]
414            if oper(have, spec):
415                return True
416    return False
417
418def _chain_decorators_on(fn, *decorators):
419    """Apply a series of decorators to fn, returning a decorated function."""
420    for decorator in reversed(decorators):
421        fn = decorator(fn)
422    return fn
423
424def rowset(results):
425    """Converts the results of sql execution into a plain set of column tuples.
426
427    Useful for asserting the results of an unordered query.
428    """
429
430    return set([tuple(row) for row in results])
431
432
433def eq_(a, b, msg=None):
434    """Assert a == b, with repr messaging on failure."""
435    assert a == b, msg or "%r != %r" % (a, b)
436
437def ne_(a, b, msg=None):
438    """Assert a != b, with repr messaging on failure."""
439    assert a != b, msg or "%r == %r" % (a, b)
440
441def is_(a, b, msg=None):
442    """Assert a is b, with repr messaging on failure."""
443    assert a is b, msg or "%r is not %r" % (a, b)
444
445def is_not_(a, b, msg=None):
446    """Assert a is not b, with repr messaging on failure."""
447    assert a is not b, msg or "%r is %r" % (a, b)
448
449def startswith_(a, fragment, msg=None):
450    """Assert a.startswith(fragment), with repr messaging on failure."""
451    assert a.startswith(fragment), msg or "%r does not start with %r" % (
452        a, fragment)
453
454def assert_raises(except_cls, callable_, *args, **kw):
455    try:
456        callable_(*args, **kw)
457        success = False
458    except except_cls, e:
459        success = True
460   
461    # assert outside the block so it works for AssertionError too !
462    assert success, "Callable did not raise an exception"
463
464def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
465    try:
466        callable_(*args, **kwargs)
467        assert False, "Callable did not raise an exception"
468    except except_cls, e:
469        assert re.search(msg, str(e)), "%r !~ %s" % (msg, e)
470
471def fail(msg):
472    assert False, msg
473   
474def fixture(table, columns, *rows):
475    """Insert data into table after creation."""
476    def onload(event, schema_item, connection):
477        insert = table.insert()
478        column_names = [col.key for col in columns]
479        connection.execute(insert, [dict(zip(column_names, column_values))
480                                    for column_values in rows])
481    table.append_ddl_listener('after-create', onload)
482
483def resolve_artifact_names(fn):
484    """Decorator, augment function globals with tables and classes.
485
486    Swaps out the function's globals at execution time. The 'global' statement
487    will not work as expected inside a decorated function.
488
489    """
490    # This could be automatically applied to framework and test_ methods in
491    # the MappedTest-derived test suites but... *some* explicitness for this
492    # magic is probably good.  Especially as 'global' won't work- these
493    # rebound functions aren't regular Python..
494    #
495    # Also: it's lame that CPython accepts a dict-subclass for globals, but
496    # only calls dict methods.  That would allow 'global' to pass through to
497    # the func_globals.
498    def resolved(*args, **kwargs):
499        self = args[0]
500        context = dict(fn.func_globals)
501        for source in self._artifact_registries:
502            context.update(getattr(self, source))
503        # jython bug #1034
504        rebound = types.FunctionType(
505            fn.func_code, context, fn.func_name, fn.func_defaults,
506            fn.func_closure)
507        return rebound(*args, **kwargs)
508    return function_named(resolved, fn.func_name)
509
510class adict(dict):
511    """Dict keys available as attributes.  Shadows."""
512    def __getattribute__(self, key):
513        try:
514            return self[key]
515        except KeyError:
516            return dict.__getattribute__(self, key)
517
518    def get_all(self, *keys):
519        return tuple([self[key] for key in keys])
520
521
522class TestBase(object):
523    # A sequence of database names to always run, regardless of the
524    # constraints below.
525    __whitelist__ = ()
526
527    # A sequence of requirement names matching testing.requires decorators
528    __requires__ = ()
529
530    # A sequence of dialect names to exclude from the test class.
531    __unsupported_on__ = ()
532
533    # If present, test class is only runnable for the *single* specified
534    # dialect.  If you need multiple, use __unsupported_on__ and invert.
535    __only_on__ = None
536
537    # A sequence of no-arg callables. If any are True, the entire testcase is
538    # skipped.
539    __skip_if__ = None
540
541    _artifact_registries = ()
542
543    def assert_(self, val, msg=None):
544        assert val, msg
545       
546class AssertsCompiledSQL(object):
547    def assert_compile(self, clause, result, params=None, checkparams=None, dialect=None):
548        if dialect is None:
549            dialect = getattr(self, '__dialect__', None)
550
551        if params is None:
552            keys = None
553        else:
554            keys = params.keys()
555
556        c = clause.compile(column_keys=keys, dialect=dialect)
557
558        print "\nSQL String:\n" + str(c) + repr(c.params)
559
560        cc = re.sub(r'\n', '', str(c))
561
562        eq_(cc, result, "%r != %r on dialect %r" % (cc, result, dialect))
563
564        if checkparams is not None:
565            eq_(c.construct_params(params), checkparams)
566
567class ComparesTables(object):
568    def assert_tables_equal(self, table, reflected_table, strict_types=False):
569        base_mro = sqltypes.TypeEngine.__mro__
570        assert len(table.c) == len(reflected_table.c)
571        for c, reflected_c in zip(table.c, reflected_table.c):
572            eq_(c.name, reflected_c.name)
573            assert reflected_c is reflected_table.c[c.name]
574            eq_(c.primary_key, reflected_c.primary_key)
575            eq_(c.nullable, reflected_c.nullable)
576            if strict_types:
577                assert type(reflected_c.type) is type(c.type), \
578                    "Type '%s' doesn't correspond to type '%s'" % (reflected_c.type, c.type)
579            else:
580                assert len(
581                    set(type(reflected_c.type).__mro__).difference(base_mro).intersection(
582                    set(type(c.type).__mro__).difference(base_mro)
583                    )
584                ) > 0, "Type '%s' doesn't correspond to type '%s'" % (reflected_c.type, c.type)
585
586            if isinstance(c.type, sqltypes.String):
587                eq_(c.type.length, reflected_c.type.length)
588
589            eq_(set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys]))
590
591        assert len(table.primary_key) == len(reflected_table.primary_key)
592        for c in table.primary_key:
593            assert reflected_table.primary_key.columns[c.name]
594
595
596class AssertsExecutionResults(object):
597    def assert_result(self, result, class_, *objects):
598        result = list(result)
599        print repr(result)
600        self.assert_list(result, class_, objects)
601
602    def assert_list(self, result, class_, list):
603        self.assert_(len(result) == len(list),
604                     "result list is not the same size as test list, " +
605                     "for class " + class_.__name__)
606        for i in range(0, len(list)):
607            self.assert_row(class_, result[i], list[i])
608
609    def assert_row(self, class_, rowobj, desc):
610        self.assert_(rowobj.__class__ is class_,
611                     "item class is not " + repr(class_))
612        for key, value in desc.iteritems():
613            if isinstance(value, tuple):
614                if isinstance(value[1], list):
615                    self.assert_list(getattr(rowobj, key), value[0], value[1])
616                else:
617                    self.assert_row(value[0], getattr(rowobj, key), value[1])
618            else:
619                self.assert_(getattr(rowobj, key) == value,
620                             "attribute %s value %s does not match %s" % (
621                             key, getattr(rowobj, key), value))
622
623    def assert_unordered_result(self, result, cls, *expected):
624        """As assert_result, but the order of objects is not considered.
625
626        The algorithm is very expensive but not a big deal for the small
627        numbers of rows that the test suite manipulates.
628        """
629
630        class frozendict(dict):
631            def __hash__(self):
632                return id(self)
633
634        found = util.IdentitySet(result)
635        expected = set([frozendict(e) for e in expected])
636
637        for wrong in itertools.ifilterfalse(lambda o: type(o) == cls, found):
638            fail('Unexpected type "%s", expected "%s"' % (
639                type(wrong).__name__, cls.__name__))
640
641        if len(found) != len(expected):
642            fail('Unexpected object count "%s", expected "%s"' % (
643                len(found), len(expected)))
644
645        NOVALUE = object()
646        def _compare_item(obj, spec):
647            for key, value in spec.iteritems():
648                if isinstance(value, tuple):
649                    try:
650                        self.assert_unordered_result(
651                            getattr(obj, key), value[0], *value[1])
652                    except AssertionError:
653                        return False
654                else:
655                    if getattr(obj, key, NOVALUE) != value:
656                        return False
657            return True
658
659        for expected_item in expected:
660            for found_item in found:
661                if _compare_item(found_item, expected_item):
662                    found.remove(found_item)
663                    break
664            else:
665                fail(
666                    "Expected %s instance with attributes %s not found." % (
667                    cls.__name__, repr(expected_item)))
668        return True
669
670    def assert_sql_execution(self, db, callable_, *rules):
671        assertsql.asserter.add_rules(rules)
672        try:
673            callable_()
674            assertsql.asserter.statement_complete()
675        finally:
676            assertsql.asserter.clear_rules()
677           
678    def assert_sql(self, db, callable_, list_, with_sequences=None):
679        if with_sequences is not None and config.db.name in ('firebird', 'oracle', 'postgres'):
680            rules = with_sequences
681        else:
682            rules = list_
683       
684        newrules = []
685        for rule in rules:
686            if isinstance(rule, dict):
687                newrule = assertsql.AllOf(*[
688                    assertsql.ExactSQL(k, v) for k, v in rule.iteritems()
689                ])
690            else:
691                newrule = assertsql.ExactSQL(*rule)
692            newrules.append(newrule)
693           
694        self.assert_sql_execution(db, callable_, *newrules)
695
696    def assert_sql_count(self, db, callable_, count):
697        self.assert_sql_execution(db, callable_, assertsql.CountStatements(count))
698
699
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。