root/galaxy-central/eggs/SQLAlchemy-0.5.6_dev_r6498-py2.6.egg/sqlalchemy/test/orm.py

リビジョン 3, 3.6 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1import inspect, re
2import config, testing
3from sqlalchemy import orm
4
5__all__ = 'mapper',
6
7
8_whitespace = re.compile(r'^(\s+)')
9
10def _find_pragma(lines, current):
11    m = _whitespace.match(lines[current])
12    basis = m and m.group() or ''
13
14    for line in reversed(lines[0:current]):
15        if 'testlib.pragma' in line:
16            return line
17        m = _whitespace.match(line)
18        indent = m and m.group() or ''
19
20        # simplistic detection:
21
22        # >> # testlib.pragma foo
23        # >> center_line()
24        if indent == basis:
25            break
26        # >> # testlib.pragma foo
27        # >> if fleem:
28        # >>     center_line()
29        if line.endswith(':'):
30            break
31    return None
32
33def _make_blocker(method_name, fallback):
34    """Creates tripwired variant of a method, raising when called.
35
36    To excempt an invocation from blockage, there are two options.
37
38    1) add a pragma in a comment::
39
40        # testlib.pragma exempt:methodname
41        offending_line()
42
43    2) add a magic cookie to the function's namespace::
44        __sa_baremethodname_exempt__ = True
45        ...
46        offending_line()
47        another_offending_lines()
48
49    The second is useful for testing and development.
50    """
51
52    if method_name.startswith('__') and method_name.endswith('__'):
53        frame_marker = '__sa_%s_exempt__' % method_name[2:-2]
54    else:
55        frame_marker = '__sa_%s_exempt__' % method_name
56    pragma_marker = 'exempt:' + method_name
57
58    def method(self, *args, **kw):
59        frame_r = None
60        try:
61            frame = inspect.stack()[1][0]
62            frame_r = inspect.getframeinfo(frame, 9)
63
64            module = frame.f_globals.get('__name__', '')
65
66            type_ = type(self)
67
68            pragma = _find_pragma(*frame_r[3:5])
69
70            exempt = (
71                (not module.startswith('sqlalchemy')) or
72                (pragma and pragma_marker in pragma) or
73                (frame_marker in frame.f_locals) or
74                ('self' in frame.f_locals and
75                 getattr(frame.f_locals['self'], frame_marker, False)))
76
77            if exempt:
78                supermeth = getattr(super(type_, self), method_name, None)
79                if (supermeth is None or
80                    getattr(supermeth, 'im_func', None) is method):
81                    return fallback(self, *args, **kw)
82                else:
83                    return supermeth(*args, **kw)
84            else:
85                raise AssertionError(
86                    "%s.%s called in %s, line %s in %s" % (
87                    type_.__name__, method_name, module, frame_r[1], frame_r[2]))
88        finally:
89            del frame
90    method.__name__ = method_name
91    return method
92
93def mapper(type_, *args, **kw):
94    forbidden = [
95        ('__hash__', 'unhashable', lambda s: id(s)),
96        ('__eq__', 'noncomparable', lambda s, o: s is o),
97        ('__ne__', 'noncomparable', lambda s, o: s is not o),
98        ('__cmp__', 'noncomparable', lambda s, o: object.__cmp__(s, o)),
99        ('__le__', 'noncomparable', lambda s, o: object.__le__(s, o)),
100        ('__lt__', 'noncomparable', lambda s, o: object.__lt__(s, o)),
101        ('__ge__', 'noncomparable', lambda s, o: object.__ge__(s, o)),
102        ('__gt__', 'noncomparable', lambda s, o: object.__gt__(s, o)),
103        ('__nonzero__', 'truthless', lambda s: 1), ]
104
105    if isinstance(type_, type) and type_.__bases__ == (object,):
106        for method_name, option, fallback in forbidden:
107            if (getattr(config.options, option, False) and
108                method_name not in type_.__dict__):
109                setattr(type_, method_name, _make_blocker(method_name, fallback))
110
111    return orm.mapper(type_, *args, **kw)
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。