[2] | 1 | import sys, new |
---|
| 2 | from galaxy.tools.parameters import grouping |
---|
| 3 | from galaxy.tools.parameters import basic |
---|
| 4 | from base.twilltestcase import TwillTestCase |
---|
| 5 | import galaxy.model |
---|
| 6 | from galaxy.model.orm import * |
---|
| 7 | from galaxy.model.mapping import context as sa_session |
---|
| 8 | |
---|
| 9 | toolbox = None |
---|
| 10 | |
---|
| 11 | class ToolTestCase( TwillTestCase ): |
---|
| 12 | """Abstract test case that runs tests based on a `galaxy.tools.test.ToolTest`""" |
---|
| 13 | def do_it( self, testdef ): |
---|
| 14 | # If the test generation had an error, raise |
---|
| 15 | if testdef.error: |
---|
| 16 | if testdef.exception: |
---|
| 17 | raise testdef.exception |
---|
| 18 | else: |
---|
| 19 | raise Exception( "Test parse failure" ) |
---|
| 20 | # Start with a new history |
---|
| 21 | self.logout() |
---|
| 22 | self.login( email='test@bx.psu.edu' ) |
---|
| 23 | admin_user = sa_session.query( galaxy.model.User ).filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ).one() |
---|
| 24 | self.new_history() |
---|
| 25 | latest_history = sa_session.query( galaxy.model.History ) \ |
---|
| 26 | .filter( and_( galaxy.model.History.table.c.deleted==False, |
---|
| 27 | galaxy.model.History.table.c.user_id==admin_user.id ) ) \ |
---|
| 28 | .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ |
---|
| 29 | .first() |
---|
| 30 | assert latest_history is not None, "Problem retrieving latest_history from database" |
---|
| 31 | if len( self.get_history_as_data_list() ) > 0: |
---|
| 32 | raise AssertionError("ToolTestCase.do_it failed") |
---|
| 33 | # Upload any needed files |
---|
| 34 | for fname, extra in testdef.required_files: |
---|
| 35 | children = extra.get( 'children', [] ) |
---|
| 36 | metadata = extra.get( 'metadata', [] ) |
---|
| 37 | composite_data = extra.get( 'composite_data', [] ) |
---|
| 38 | self.upload_file( fname, ftype=extra.get( 'ftype', 'auto' ), dbkey=extra.get( 'dbkey', 'hg17' ), metadata = metadata, composite_data = composite_data ) |
---|
| 39 | print "Uploaded file: ", fname, ", ftype: ", extra.get( 'ftype', 'auto' ), ", extra: ", extra |
---|
| 40 | #Post upload attribute editing |
---|
| 41 | edit_attributes = extra.get( 'edit_attributes', [] ) |
---|
| 42 | #currently only renaming is supported |
---|
| 43 | for edit_att in edit_attributes: |
---|
| 44 | if edit_att.get( 'type', None ) == 'name': |
---|
| 45 | new_name = edit_att.get( 'value', None ) |
---|
| 46 | assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag' |
---|
| 47 | hda_id = self.get_history_as_data_list()[-1].get( 'id' ) |
---|
| 48 | try: |
---|
| 49 | self.edit_hda_attribute_info( hda_id = str(hda_id), new_name = new_name ) |
---|
| 50 | except: |
---|
| 51 | print "### call to edit_hda failed for hda_id %s, new_name=%s" % (hda_id,new_name) |
---|
| 52 | else: |
---|
| 53 | raise Exception( 'edit_attributes type (%s) is unimplemented' % edit_att.get( 'type', None ) ) |
---|
| 54 | # We need to handle the case where we've uploaded a valid compressed file since the upload |
---|
| 55 | # tool will have uncompressed it on the fly. |
---|
| 56 | all_inputs = {} |
---|
| 57 | for name, value, _ in testdef.inputs: |
---|
| 58 | if value: |
---|
| 59 | for end in [ '.zip', '.gz' ]: |
---|
| 60 | if value.endswith( end ): |
---|
| 61 | value = value.rstrip( end ) |
---|
| 62 | break |
---|
| 63 | all_inputs[ name ] = value |
---|
| 64 | # See if we have a grouping.Repeat element |
---|
| 65 | repeat_name = None |
---|
| 66 | for input_name, input_value in testdef.tool.inputs_by_page[0].items(): |
---|
| 67 | if isinstance( input_value, grouping.Repeat ): |
---|
| 68 | repeat_name = input_name |
---|
| 69 | break |
---|
| 70 | #check if we need to verify number of outputs created dynamically by tool |
---|
| 71 | if testdef.tool.force_history_refresh: |
---|
| 72 | job_finish_by_output_count = len( self.get_history_as_data_list() ) |
---|
| 73 | else: |
---|
| 74 | job_finish_by_output_count = False |
---|
| 75 | # Do the first page |
---|
| 76 | page_inputs = self.__expand_grouping(testdef.tool.inputs_by_page[0], all_inputs) |
---|
| 77 | # Run the tool |
---|
| 78 | self.run_tool( testdef.tool.id, repeat_name=repeat_name, **page_inputs ) |
---|
| 79 | print "page_inputs (0)", page_inputs |
---|
| 80 | # Do other pages if they exist |
---|
| 81 | for i in range( 1, testdef.tool.npages ): |
---|
| 82 | page_inputs = self.__expand_grouping(testdef.tool.inputs_by_page[i], all_inputs) |
---|
| 83 | self.submit_form( **page_inputs ) |
---|
| 84 | print "page_inputs (%i)" % i, page_inputs |
---|
| 85 | # Check the results ( handles single or multiple tool outputs ). Make sure to pass the correct hid. |
---|
| 86 | # The output datasets from the tool should be in the same order as the testdef.outputs. |
---|
| 87 | data_list = None |
---|
| 88 | while data_list is None: |
---|
| 89 | data_list = self.get_history_as_data_list() |
---|
| 90 | if job_finish_by_output_count and len( testdef.outputs ) > ( len( data_list ) - job_finish_by_output_count ): |
---|
| 91 | data_list = None |
---|
| 92 | self.assertTrue( data_list ) |
---|
| 93 | elem_index = 0 - len( testdef.outputs ) |
---|
| 94 | for output_tuple in testdef.outputs: |
---|
| 95 | name, outfile, attributes = output_tuple |
---|
| 96 | # Get the correct hid |
---|
| 97 | elem = data_list[ elem_index ] |
---|
| 98 | self.assertTrue( elem is not None ) |
---|
| 99 | elem_hid = elem.get( 'hid' ) |
---|
| 100 | elem_index += 1 |
---|
| 101 | self.verify_dataset_correctness( outfile, hid=elem_hid, maxseconds=testdef.maxseconds, attributes=attributes ) |
---|
| 102 | self.delete_history( id=self.security.encode_id( latest_history.id ) ) |
---|
| 103 | |
---|
| 104 | def __expand_grouping( self, tool_inputs, declared_inputs, prefix='' ): |
---|
| 105 | expanded_inputs = {} |
---|
| 106 | for key, value in tool_inputs.items(): |
---|
| 107 | if isinstance( value, grouping.Conditional ): |
---|
| 108 | if prefix: |
---|
| 109 | new_prefix = "%s|%s" % ( prefix, value.name ) |
---|
| 110 | else: |
---|
| 111 | new_prefix = value.name |
---|
| 112 | for i, case in enumerate( value.cases ): |
---|
| 113 | if declared_inputs[ value.test_param.name ] == case.value: |
---|
| 114 | if isinstance(case.value, str): |
---|
| 115 | expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value.split( "," ) |
---|
| 116 | else: |
---|
| 117 | expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value |
---|
| 118 | for input_name, input_value in case.inputs.items(): |
---|
| 119 | expanded_inputs.update( self.__expand_grouping( { input_name:input_value }, declared_inputs, prefix = new_prefix ) ) |
---|
| 120 | elif isinstance( value, grouping.Repeat ): |
---|
| 121 | for repeat_index in xrange( 0, 1 ): #need to allow for and figure out how many repeats we have |
---|
| 122 | for r_name, r_value in value.inputs.iteritems(): |
---|
| 123 | new_prefix = "%s_%d" % ( value.name, repeat_index ) |
---|
| 124 | if prefix: |
---|
| 125 | new_prefix = "%s|%s" % ( prefix, new_prefix ) |
---|
| 126 | expanded_inputs.update( self.__expand_grouping( { new_prefix : r_value }, declared_inputs, prefix = new_prefix ) ) |
---|
| 127 | elif isinstance(declared_inputs[value.name], str): |
---|
| 128 | if prefix: |
---|
| 129 | expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name].split(",") |
---|
| 130 | else: |
---|
| 131 | expanded_inputs[value.name] = declared_inputs[value.name].split(",") |
---|
| 132 | else: |
---|
| 133 | if prefix: |
---|
| 134 | expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name] |
---|
| 135 | else: |
---|
| 136 | expanded_inputs[value.name] = declared_inputs[value.name] |
---|
| 137 | return expanded_inputs |
---|
| 138 | |
---|
| 139 | def build_tests(): |
---|
| 140 | """ |
---|
| 141 | If the module level variable `toolbox` is set, generate `ToolTestCase` |
---|
| 142 | classes for all of its tests and put them into this modules globals() so |
---|
| 143 | they can be discovered by nose. |
---|
| 144 | """ |
---|
| 145 | if toolbox is None: |
---|
| 146 | return |
---|
| 147 | # Push all the toolbox tests to module level |
---|
| 148 | G = globals() |
---|
| 149 | for i, tool_id in enumerate( toolbox.tools_by_id ): |
---|
| 150 | tool = toolbox.tools_by_id[ tool_id ] |
---|
| 151 | if tool.tests: |
---|
| 152 | # Create a new subclass of ToolTestCase dynamically adding methods |
---|
| 153 | # names test_tool_XXX that run each test defined in the tool. |
---|
| 154 | n = "TestForTool_" + tool.id.replace( ' ', '_' ) |
---|
| 155 | s = ( ToolTestCase, ) |
---|
| 156 | d = dict() |
---|
| 157 | for j, testdef in enumerate( tool.tests ): |
---|
| 158 | def make_test_method( td ): |
---|
| 159 | def test_tool( self ): |
---|
| 160 | self.do_it( td ) |
---|
| 161 | return test_tool |
---|
| 162 | m = make_test_method( testdef ) |
---|
| 163 | m.__doc__ = "%s ( %s ) > %s" % ( tool.name, tool.id, testdef.name ) |
---|
| 164 | d['test_tool_%06d' % j] = m |
---|
| 165 | G[ n ] = new.classobj( n, s, d ) |
---|
| 166 | |
---|