1 | import sys, new |
---|
2 | from galaxy.tools.parameters import grouping |
---|
3 | from galaxy.tools.parameters import basic |
---|
4 | from base.twilltestcase import TwillTestCase |
---|
5 | import galaxy.model |
---|
6 | from galaxy.model.orm import * |
---|
7 | from galaxy.model.mapping import context as sa_session |
---|
8 | |
---|
9 | toolbox = None |
---|
10 | |
---|
11 | class ToolTestCase( TwillTestCase ): |
---|
12 | """Abstract test case that runs tests based on a `galaxy.tools.test.ToolTest`""" |
---|
13 | def do_it( self, testdef ): |
---|
14 | # If the test generation had an error, raise |
---|
15 | if testdef.error: |
---|
16 | if testdef.exception: |
---|
17 | raise testdef.exception |
---|
18 | else: |
---|
19 | raise Exception( "Test parse failure" ) |
---|
20 | # Start with a new history |
---|
21 | self.logout() |
---|
22 | self.login( email='test@bx.psu.edu' ) |
---|
23 | admin_user = sa_session.query( galaxy.model.User ).filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ).one() |
---|
24 | self.new_history() |
---|
25 | latest_history = sa_session.query( galaxy.model.History ) \ |
---|
26 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
27 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
28 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
29 | .first() |
---|
30 | assert latest_history is not None, "Problem retrieving latest_history from database" |
---|
31 | if len( self.get_history_as_data_list() ) > 0: |
---|
32 | raise AssertionError("ToolTestCase.do_it failed") |
---|
33 | # Upload any needed files |
---|
34 | for fname, extra in testdef.required_files: |
---|
35 | children = extra.get( 'children', [] ) |
---|
36 | metadata = extra.get( 'metadata', [] ) |
---|
37 | composite_data = extra.get( 'composite_data', [] ) |
---|
38 | self.upload_file( fname, ftype=extra.get( 'ftype', 'auto' ), dbkey=extra.get( 'dbkey', 'hg17' ), metadata = metadata, composite_data = composite_data ) |
---|
39 | print "Uploaded file: ", fname, ", ftype: ", extra.get( 'ftype', 'auto' ), ", extra: ", extra |
---|
40 | #Post upload attribute editing |
---|
41 | edit_attributes = extra.get( 'edit_attributes', [] ) |
---|
42 | #currently only renaming is supported |
---|
43 | for edit_att in edit_attributes: |
---|
44 | if edit_att.get( 'type', None ) == 'name': |
---|
45 | new_name = edit_att.get( 'value', None ) |
---|
46 | assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag' |
---|
47 | hda_id = self.get_history_as_data_list()[-1].get( 'id' ) |
---|
48 | try: |
---|
49 | self.edit_hda_attribute_info( hda_id = str(hda_id), new_name = new_name ) |
---|
50 | except: |
---|
51 | print "### call to edit_hda failed for hda_id %s, new_name=%s" % (hda_id,new_name) |
---|
52 | else: |
---|
53 | raise Exception( 'edit_attributes type (%s) is unimplemented' % edit_att.get( 'type', None ) ) |
---|
54 | # We need to handle the case where we've uploaded a valid compressed file since the upload |
---|
55 | # tool will have uncompressed it on the fly. |
---|
56 | all_inputs = {} |
---|
57 | for name, value, _ in testdef.inputs: |
---|
58 | if value: |
---|
59 | for end in [ '.zip', '.gz' ]: |
---|
60 | if value.endswith( end ): |
---|
61 | value = value.rstrip( end ) |
---|
62 | break |
---|
63 | all_inputs[ name ] = value |
---|
64 | # See if we have a grouping.Repeat element |
---|
65 | repeat_name = None |
---|
66 | for input_name, input_value in testdef.tool.inputs_by_page[0].items(): |
---|
67 | if isinstance( input_value, grouping.Repeat ): |
---|
68 | repeat_name = input_name |
---|
69 | break |
---|
70 | #check if we need to verify number of outputs created dynamically by tool |
---|
71 | if testdef.tool.force_history_refresh: |
---|
72 | job_finish_by_output_count = len( self.get_history_as_data_list() ) |
---|
73 | else: |
---|
74 | job_finish_by_output_count = False |
---|
75 | # Do the first page |
---|
76 | page_inputs = self.__expand_grouping(testdef.tool.inputs_by_page[0], all_inputs) |
---|
77 | # Run the tool |
---|
78 | self.run_tool( testdef.tool.id, repeat_name=repeat_name, **page_inputs ) |
---|
79 | print "page_inputs (0)", page_inputs |
---|
80 | # Do other pages if they exist |
---|
81 | for i in range( 1, testdef.tool.npages ): |
---|
82 | page_inputs = self.__expand_grouping(testdef.tool.inputs_by_page[i], all_inputs) |
---|
83 | self.submit_form( **page_inputs ) |
---|
84 | print "page_inputs (%i)" % i, page_inputs |
---|
85 | # Check the results ( handles single or multiple tool outputs ). Make sure to pass the correct hid. |
---|
86 | # The output datasets from the tool should be in the same order as the testdef.outputs. |
---|
87 | data_list = None |
---|
88 | while data_list is None: |
---|
89 | data_list = self.get_history_as_data_list() |
---|
90 | if job_finish_by_output_count and len( testdef.outputs ) > ( len( data_list ) - job_finish_by_output_count ): |
---|
91 | data_list = None |
---|
92 | self.assertTrue( data_list ) |
---|
93 | elem_index = 0 - len( testdef.outputs ) |
---|
94 | for output_tuple in testdef.outputs: |
---|
95 | name, outfile, attributes = output_tuple |
---|
96 | # Get the correct hid |
---|
97 | elem = data_list[ elem_index ] |
---|
98 | self.assertTrue( elem is not None ) |
---|
99 | elem_hid = elem.get( 'hid' ) |
---|
100 | elem_index += 1 |
---|
101 | self.verify_dataset_correctness( outfile, hid=elem_hid, maxseconds=testdef.maxseconds, attributes=attributes ) |
---|
102 | self.delete_history( id=self.security.encode_id( latest_history.id ) ) |
---|
103 | |
---|
104 | def __expand_grouping( self, tool_inputs, declared_inputs, prefix='' ): |
---|
105 | expanded_inputs = {} |
---|
106 | for key, value in tool_inputs.items(): |
---|
107 | if isinstance( value, grouping.Conditional ): |
---|
108 | if prefix: |
---|
109 | new_prefix = "%s|%s" % ( prefix, value.name ) |
---|
110 | else: |
---|
111 | new_prefix = value.name |
---|
112 | for i, case in enumerate( value.cases ): |
---|
113 | if declared_inputs[ value.test_param.name ] == case.value: |
---|
114 | if isinstance(case.value, str): |
---|
115 | expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value.split( "," ) |
---|
116 | else: |
---|
117 | expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value |
---|
118 | for input_name, input_value in case.inputs.items(): |
---|
119 | expanded_inputs.update( self.__expand_grouping( { input_name:input_value }, declared_inputs, prefix = new_prefix ) ) |
---|
120 | elif isinstance( value, grouping.Repeat ): |
---|
121 | for repeat_index in xrange( 0, 1 ): #need to allow for and figure out how many repeats we have |
---|
122 | for r_name, r_value in value.inputs.iteritems(): |
---|
123 | new_prefix = "%s_%d" % ( value.name, repeat_index ) |
---|
124 | if prefix: |
---|
125 | new_prefix = "%s|%s" % ( prefix, new_prefix ) |
---|
126 | expanded_inputs.update( self.__expand_grouping( { new_prefix : r_value }, declared_inputs, prefix = new_prefix ) ) |
---|
127 | elif isinstance(declared_inputs[value.name], str): |
---|
128 | if prefix: |
---|
129 | expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name].split(",") |
---|
130 | else: |
---|
131 | expanded_inputs[value.name] = declared_inputs[value.name].split(",") |
---|
132 | else: |
---|
133 | if prefix: |
---|
134 | expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name] |
---|
135 | else: |
---|
136 | expanded_inputs[value.name] = declared_inputs[value.name] |
---|
137 | return expanded_inputs |
---|
138 | |
---|
139 | def build_tests(): |
---|
140 | """ |
---|
141 | If the module level variable `toolbox` is set, generate `ToolTestCase` |
---|
142 | classes for all of its tests and put them into this modules globals() so |
---|
143 | they can be discovered by nose. |
---|
144 | """ |
---|
145 | if toolbox is None: |
---|
146 | return |
---|
147 | # Push all the toolbox tests to module level |
---|
148 | G = globals() |
---|
149 | for i, tool_id in enumerate( toolbox.tools_by_id ): |
---|
150 | tool = toolbox.tools_by_id[ tool_id ] |
---|
151 | if tool.tests: |
---|
152 | # Create a new subclass of ToolTestCase dynamically adding methods |
---|
153 | # names test_tool_XXX that run each test defined in the tool. |
---|
154 | n = "TestForTool_" + tool.id.replace( ' ', '_' ) |
---|
155 | s = ( ToolTestCase, ) |
---|
156 | d = dict() |
---|
157 | for j, testdef in enumerate( tool.tests ): |
---|
158 | def make_test_method( td ): |
---|
159 | def test_tool( self ): |
---|
160 | self.do_it( td ) |
---|
161 | return test_tool |
---|
162 | m = make_test_method( testdef ) |
---|
163 | m.__doc__ = "%s ( %s ) > %s" % ( tool.name, tool.id, testdef.name ) |
---|
164 | d['test_tool_%06d' % j] = m |
---|
165 | G[ n ] = new.classobj( n, s, d ) |
---|
166 | |
---|