1 | import inspect, re |
---|
2 | import config, testing |
---|
3 | from sqlalchemy import orm |
---|
4 | |
---|
5 | __all__ = 'mapper', |
---|
6 | |
---|
7 | |
---|
8 | _whitespace = re.compile(r'^(\s+)') |
---|
9 | |
---|
10 | def _find_pragma(lines, current): |
---|
11 | m = _whitespace.match(lines[current]) |
---|
12 | basis = m and m.group() or '' |
---|
13 | |
---|
14 | for line in reversed(lines[0:current]): |
---|
15 | if 'testlib.pragma' in line: |
---|
16 | return line |
---|
17 | m = _whitespace.match(line) |
---|
18 | indent = m and m.group() or '' |
---|
19 | |
---|
20 | # simplistic detection: |
---|
21 | |
---|
22 | # >> # testlib.pragma foo |
---|
23 | # >> center_line() |
---|
24 | if indent == basis: |
---|
25 | break |
---|
26 | # >> # testlib.pragma foo |
---|
27 | # >> if fleem: |
---|
28 | # >> center_line() |
---|
29 | if line.endswith(':'): |
---|
30 | break |
---|
31 | return None |
---|
32 | |
---|
33 | def _make_blocker(method_name, fallback): |
---|
34 | """Creates tripwired variant of a method, raising when called. |
---|
35 | |
---|
36 | To excempt an invocation from blockage, there are two options. |
---|
37 | |
---|
38 | 1) add a pragma in a comment:: |
---|
39 | |
---|
40 | # testlib.pragma exempt:methodname |
---|
41 | offending_line() |
---|
42 | |
---|
43 | 2) add a magic cookie to the function's namespace:: |
---|
44 | __sa_baremethodname_exempt__ = True |
---|
45 | ... |
---|
46 | offending_line() |
---|
47 | another_offending_lines() |
---|
48 | |
---|
49 | The second is useful for testing and development. |
---|
50 | """ |
---|
51 | |
---|
52 | if method_name.startswith('__') and method_name.endswith('__'): |
---|
53 | frame_marker = '__sa_%s_exempt__' % method_name[2:-2] |
---|
54 | else: |
---|
55 | frame_marker = '__sa_%s_exempt__' % method_name |
---|
56 | pragma_marker = 'exempt:' + method_name |
---|
57 | |
---|
58 | def method(self, *args, **kw): |
---|
59 | frame_r = None |
---|
60 | try: |
---|
61 | frame = inspect.stack()[1][0] |
---|
62 | frame_r = inspect.getframeinfo(frame, 9) |
---|
63 | |
---|
64 | module = frame.f_globals.get('__name__', '') |
---|
65 | |
---|
66 | type_ = type(self) |
---|
67 | |
---|
68 | pragma = _find_pragma(*frame_r[3:5]) |
---|
69 | |
---|
70 | exempt = ( |
---|
71 | (not module.startswith('sqlalchemy')) or |
---|
72 | (pragma and pragma_marker in pragma) or |
---|
73 | (frame_marker in frame.f_locals) or |
---|
74 | ('self' in frame.f_locals and |
---|
75 | getattr(frame.f_locals['self'], frame_marker, False))) |
---|
76 | |
---|
77 | if exempt: |
---|
78 | supermeth = getattr(super(type_, self), method_name, None) |
---|
79 | if (supermeth is None or |
---|
80 | getattr(supermeth, 'im_func', None) is method): |
---|
81 | return fallback(self, *args, **kw) |
---|
82 | else: |
---|
83 | return supermeth(*args, **kw) |
---|
84 | else: |
---|
85 | raise AssertionError( |
---|
86 | "%s.%s called in %s, line %s in %s" % ( |
---|
87 | type_.__name__, method_name, module, frame_r[1], frame_r[2])) |
---|
88 | finally: |
---|
89 | del frame |
---|
90 | method.__name__ = method_name |
---|
91 | return method |
---|
92 | |
---|
93 | def mapper(type_, *args, **kw): |
---|
94 | forbidden = [ |
---|
95 | ('__hash__', 'unhashable', lambda s: id(s)), |
---|
96 | ('__eq__', 'noncomparable', lambda s, o: s is o), |
---|
97 | ('__ne__', 'noncomparable', lambda s, o: s is not o), |
---|
98 | ('__cmp__', 'noncomparable', lambda s, o: object.__cmp__(s, o)), |
---|
99 | ('__le__', 'noncomparable', lambda s, o: object.__le__(s, o)), |
---|
100 | ('__lt__', 'noncomparable', lambda s, o: object.__lt__(s, o)), |
---|
101 | ('__ge__', 'noncomparable', lambda s, o: object.__ge__(s, o)), |
---|
102 | ('__gt__', 'noncomparable', lambda s, o: object.__gt__(s, o)), |
---|
103 | ('__nonzero__', 'truthless', lambda s: 1), ] |
---|
104 | |
---|
105 | if isinstance(type_, type) and type_.__bases__ == (object,): |
---|
106 | for method_name, option, fallback in forbidden: |
---|
107 | if (getattr(config.options, option, False) and |
---|
108 | method_name not in type_.__dict__): |
---|
109 | setattr(type_, method_name, _make_blocker(method_name, fallback)) |
---|
110 | |
---|
111 | return orm.mapper(type_, *args, **kw) |
---|