| 1 | import sys, new |
|---|
| 2 | from galaxy.tools.parameters import grouping |
|---|
| 3 | from galaxy.tools.parameters import basic |
|---|
| 4 | from base.twilltestcase import TwillTestCase |
|---|
| 5 | import galaxy.model |
|---|
| 6 | from galaxy.model.orm import * |
|---|
| 7 | from galaxy.model.mapping import context as sa_session |
|---|
| 8 | |
|---|
| 9 | toolbox = None |
|---|
| 10 | |
|---|
| 11 | class ToolTestCase( TwillTestCase ): |
|---|
| 12 | """Abstract test case that runs tests based on a `galaxy.tools.test.ToolTest`""" |
|---|
| 13 | def do_it( self, testdef ): |
|---|
| 14 | # If the test generation had an error, raise |
|---|
| 15 | if testdef.error: |
|---|
| 16 | if testdef.exception: |
|---|
| 17 | raise testdef.exception |
|---|
| 18 | else: |
|---|
| 19 | raise Exception( "Test parse failure" ) |
|---|
| 20 | # Start with a new history |
|---|
| 21 | self.logout() |
|---|
| 22 | self.login( email='test@bx.psu.edu' ) |
|---|
| 23 | admin_user = sa_session.query( galaxy.model.User ).filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ).one() |
|---|
| 24 | self.new_history() |
|---|
| 25 | latest_history = sa_session.query( galaxy.model.History ) \ |
|---|
| 26 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
|---|
| 27 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
|---|
| 28 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
|---|
| 29 | .first() |
|---|
| 30 | assert latest_history is not None, "Problem retrieving latest_history from database" |
|---|
| 31 | if len( self.get_history_as_data_list() ) > 0: |
|---|
| 32 | raise AssertionError("ToolTestCase.do_it failed") |
|---|
| 33 | # Upload any needed files |
|---|
| 34 | for fname, extra in testdef.required_files: |
|---|
| 35 | children = extra.get( 'children', [] ) |
|---|
| 36 | metadata = extra.get( 'metadata', [] ) |
|---|
| 37 | composite_data = extra.get( 'composite_data', [] ) |
|---|
| 38 | self.upload_file( fname, ftype=extra.get( 'ftype', 'auto' ), dbkey=extra.get( 'dbkey', 'hg17' ), metadata = metadata, composite_data = composite_data ) |
|---|
| 39 | print "Uploaded file: ", fname, ", ftype: ", extra.get( 'ftype', 'auto' ), ", extra: ", extra |
|---|
| 40 | #Post upload attribute editing |
|---|
| 41 | edit_attributes = extra.get( 'edit_attributes', [] ) |
|---|
| 42 | #currently only renaming is supported |
|---|
| 43 | for edit_att in edit_attributes: |
|---|
| 44 | if edit_att.get( 'type', None ) == 'name': |
|---|
| 45 | new_name = edit_att.get( 'value', None ) |
|---|
| 46 | assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag' |
|---|
| 47 | hda_id = self.get_history_as_data_list()[-1].get( 'id' ) |
|---|
| 48 | try: |
|---|
| 49 | self.edit_hda_attribute_info( hda_id = str(hda_id), new_name = new_name ) |
|---|
| 50 | except: |
|---|
| 51 | print "### call to edit_hda failed for hda_id %s, new_name=%s" % (hda_id,new_name) |
|---|
| 52 | else: |
|---|
| 53 | raise Exception( 'edit_attributes type (%s) is unimplemented' % edit_att.get( 'type', None ) ) |
|---|
| 54 | # We need to handle the case where we've uploaded a valid compressed file since the upload |
|---|
| 55 | # tool will have uncompressed it on the fly. |
|---|
| 56 | all_inputs = {} |
|---|
| 57 | for name, value, _ in testdef.inputs: |
|---|
| 58 | if value: |
|---|
| 59 | for end in [ '.zip', '.gz' ]: |
|---|
| 60 | if value.endswith( end ): |
|---|
| 61 | value = value.rstrip( end ) |
|---|
| 62 | break |
|---|
| 63 | all_inputs[ name ] = value |
|---|
| 64 | # See if we have a grouping.Repeat element |
|---|
| 65 | repeat_name = None |
|---|
| 66 | for input_name, input_value in testdef.tool.inputs_by_page[0].items(): |
|---|
| 67 | if isinstance( input_value, grouping.Repeat ): |
|---|
| 68 | repeat_name = input_name |
|---|
| 69 | break |
|---|
| 70 | #check if we need to verify number of outputs created dynamically by tool |
|---|
| 71 | if testdef.tool.force_history_refresh: |
|---|
| 72 | job_finish_by_output_count = len( self.get_history_as_data_list() ) |
|---|
| 73 | else: |
|---|
| 74 | job_finish_by_output_count = False |
|---|
| 75 | # Do the first page |
|---|
| 76 | page_inputs = self.__expand_grouping(testdef.tool.inputs_by_page[0], all_inputs) |
|---|
| 77 | # Run the tool |
|---|
| 78 | self.run_tool( testdef.tool.id, repeat_name=repeat_name, **page_inputs ) |
|---|
| 79 | print "page_inputs (0)", page_inputs |
|---|
| 80 | # Do other pages if they exist |
|---|
| 81 | for i in range( 1, testdef.tool.npages ): |
|---|
| 82 | page_inputs = self.__expand_grouping(testdef.tool.inputs_by_page[i], all_inputs) |
|---|
| 83 | self.submit_form( **page_inputs ) |
|---|
| 84 | print "page_inputs (%i)" % i, page_inputs |
|---|
| 85 | # Check the results ( handles single or multiple tool outputs ). Make sure to pass the correct hid. |
|---|
| 86 | # The output datasets from the tool should be in the same order as the testdef.outputs. |
|---|
| 87 | data_list = None |
|---|
| 88 | while data_list is None: |
|---|
| 89 | data_list = self.get_history_as_data_list() |
|---|
| 90 | if job_finish_by_output_count and len( testdef.outputs ) > ( len( data_list ) - job_finish_by_output_count ): |
|---|
| 91 | data_list = None |
|---|
| 92 | self.assertTrue( data_list ) |
|---|
| 93 | elem_index = 0 - len( testdef.outputs ) |
|---|
| 94 | for output_tuple in testdef.outputs: |
|---|
| 95 | name, outfile, attributes = output_tuple |
|---|
| 96 | # Get the correct hid |
|---|
| 97 | elem = data_list[ elem_index ] |
|---|
| 98 | self.assertTrue( elem is not None ) |
|---|
| 99 | elem_hid = elem.get( 'hid' ) |
|---|
| 100 | elem_index += 1 |
|---|
| 101 | self.verify_dataset_correctness( outfile, hid=elem_hid, maxseconds=testdef.maxseconds, attributes=attributes ) |
|---|
| 102 | self.delete_history( id=self.security.encode_id( latest_history.id ) ) |
|---|
| 103 | |
|---|
| 104 | def __expand_grouping( self, tool_inputs, declared_inputs, prefix='' ): |
|---|
| 105 | expanded_inputs = {} |
|---|
| 106 | for key, value in tool_inputs.items(): |
|---|
| 107 | if isinstance( value, grouping.Conditional ): |
|---|
| 108 | if prefix: |
|---|
| 109 | new_prefix = "%s|%s" % ( prefix, value.name ) |
|---|
| 110 | else: |
|---|
| 111 | new_prefix = value.name |
|---|
| 112 | for i, case in enumerate( value.cases ): |
|---|
| 113 | if declared_inputs[ value.test_param.name ] == case.value: |
|---|
| 114 | if isinstance(case.value, str): |
|---|
| 115 | expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value.split( "," ) |
|---|
| 116 | else: |
|---|
| 117 | expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value |
|---|
| 118 | for input_name, input_value in case.inputs.items(): |
|---|
| 119 | expanded_inputs.update( self.__expand_grouping( { input_name:input_value }, declared_inputs, prefix = new_prefix ) ) |
|---|
| 120 | elif isinstance( value, grouping.Repeat ): |
|---|
| 121 | for repeat_index in xrange( 0, 1 ): #need to allow for and figure out how many repeats we have |
|---|
| 122 | for r_name, r_value in value.inputs.iteritems(): |
|---|
| 123 | new_prefix = "%s_%d" % ( value.name, repeat_index ) |
|---|
| 124 | if prefix: |
|---|
| 125 | new_prefix = "%s|%s" % ( prefix, new_prefix ) |
|---|
| 126 | expanded_inputs.update( self.__expand_grouping( { new_prefix : r_value }, declared_inputs, prefix = new_prefix ) ) |
|---|
| 127 | elif isinstance(declared_inputs[value.name], str): |
|---|
| 128 | if prefix: |
|---|
| 129 | expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name].split(",") |
|---|
| 130 | else: |
|---|
| 131 | expanded_inputs[value.name] = declared_inputs[value.name].split(",") |
|---|
| 132 | else: |
|---|
| 133 | if prefix: |
|---|
| 134 | expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name] |
|---|
| 135 | else: |
|---|
| 136 | expanded_inputs[value.name] = declared_inputs[value.name] |
|---|
| 137 | return expanded_inputs |
|---|
| 138 | |
|---|
| 139 | def build_tests(): |
|---|
| 140 | """ |
|---|
| 141 | If the module level variable `toolbox` is set, generate `ToolTestCase` |
|---|
| 142 | classes for all of its tests and put them into this modules globals() so |
|---|
| 143 | they can be discovered by nose. |
|---|
| 144 | """ |
|---|
| 145 | if toolbox is None: |
|---|
| 146 | return |
|---|
| 147 | # Push all the toolbox tests to module level |
|---|
| 148 | G = globals() |
|---|
| 149 | for i, tool_id in enumerate( toolbox.tools_by_id ): |
|---|
| 150 | tool = toolbox.tools_by_id[ tool_id ] |
|---|
| 151 | if tool.tests: |
|---|
| 152 | # Create a new subclass of ToolTestCase dynamically adding methods |
|---|
| 153 | # names test_tool_XXX that run each test defined in the tool. |
|---|
| 154 | n = "TestForTool_" + tool.id.replace( ' ', '_' ) |
|---|
| 155 | s = ( ToolTestCase, ) |
|---|
| 156 | d = dict() |
|---|
| 157 | for j, testdef in enumerate( tool.tests ): |
|---|
| 158 | def make_test_method( td ): |
|---|
| 159 | def test_tool( self ): |
|---|
| 160 | self.do_it( td ) |
|---|
| 161 | return test_tool |
|---|
| 162 | m = make_test_method( testdef ) |
|---|
| 163 | m.__doc__ = "%s ( %s ) > %s" % ( tool.name, tool.id, testdef.name ) |
|---|
| 164 | d['test_tool_%06d' % j] = m |
|---|
| 165 | G[ n ] = new.classobj( n, s, d ) |
|---|
| 166 | |
|---|