root/galaxy-central/test/functional/test_toolbox.py

リビジョン 2, 8.7 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import sys, new
2from galaxy.tools.parameters import grouping
3from galaxy.tools.parameters import basic
4from base.twilltestcase import TwillTestCase
5import galaxy.model
6from galaxy.model.orm import *
7from galaxy.model.mapping import context as sa_session
8
9toolbox = None
10
11class ToolTestCase( TwillTestCase ):
12    """Abstract test case that runs tests based on a `galaxy.tools.test.ToolTest`"""
13    def do_it( self, testdef ):
14        # If the test generation had an error, raise
15        if testdef.error:
16            if testdef.exception:
17                raise testdef.exception
18            else:
19                raise Exception( "Test parse failure" )
20        # Start with a new history
21        self.logout()
22        self.login( email='test@bx.psu.edu' )
23        admin_user = sa_session.query( galaxy.model.User ).filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ).one()
24        self.new_history()
25        latest_history = sa_session.query( galaxy.model.History ) \
26                                   .filter( and_( galaxy.model.History.table.c.deleted==False,
27                                                  galaxy.model.History.table.c.user_id==admin_user.id ) ) \
28                                   .order_by( desc( galaxy.model.History.table.c.create_time ) ) \
29                                   .first()
30        assert latest_history is not None, "Problem retrieving latest_history from database"
31        if len( self.get_history_as_data_list() ) > 0:
32            raise AssertionError("ToolTestCase.do_it failed")
33        # Upload any needed files
34        for fname, extra in testdef.required_files:
35            children = extra.get( 'children', [] )
36            metadata = extra.get( 'metadata', [] )
37            composite_data = extra.get( 'composite_data', [] )
38            self.upload_file( fname, ftype=extra.get( 'ftype', 'auto' ), dbkey=extra.get( 'dbkey', 'hg17' ), metadata = metadata, composite_data = composite_data )
39            print "Uploaded file: ", fname, ", ftype: ", extra.get( 'ftype', 'auto' ), ", extra: ", extra
40            #Post upload attribute editing
41            edit_attributes = extra.get( 'edit_attributes', [] )
42            #currently only renaming is supported
43            for edit_att in edit_attributes:
44                if edit_att.get( 'type', None ) == 'name':
45                    new_name = edit_att.get( 'value', None )
46                    assert new_name, 'You must supply the new dataset name as the value tag of the edit_attributes tag'
47                    hda_id = self.get_history_as_data_list()[-1].get( 'id' )
48                    try:
49                        self.edit_hda_attribute_info( hda_id = str(hda_id), new_name = new_name )
50                    except:
51                        print "### call to edit_hda failed for hda_id %s, new_name=%s" % (hda_id,new_name)
52                else:
53                    raise Exception( 'edit_attributes type (%s) is unimplemented' % edit_att.get( 'type', None ) )
54        # We need to handle the case where we've uploaded a valid compressed file since the upload
55        # tool will have uncompressed it on the fly.
56        all_inputs = {}
57        for name, value, _ in testdef.inputs:
58            if value:
59                for end in [ '.zip', '.gz' ]:
60                    if value.endswith( end ):
61                        value = value.rstrip( end )
62                        break
63            all_inputs[ name ] = value
64        # See if we have a grouping.Repeat element
65        repeat_name = None
66        for input_name, input_value in testdef.tool.inputs_by_page[0].items():
67            if isinstance( input_value, grouping.Repeat ):
68                repeat_name = input_name
69                break
70        #check if we need to verify number of outputs created dynamically by tool
71        if testdef.tool.force_history_refresh:
72            job_finish_by_output_count = len( self.get_history_as_data_list() )
73        else:
74            job_finish_by_output_count = False
75        # Do the first page
76        page_inputs =  self.__expand_grouping(testdef.tool.inputs_by_page[0], all_inputs)
77        # Run the tool
78        self.run_tool( testdef.tool.id, repeat_name=repeat_name, **page_inputs )
79        print "page_inputs (0)", page_inputs
80        # Do other pages if they exist
81        for i in range( 1, testdef.tool.npages ):
82            page_inputs = self.__expand_grouping(testdef.tool.inputs_by_page[i], all_inputs)
83            self.submit_form( **page_inputs )
84            print "page_inputs (%i)" % i, page_inputs
85        # Check the results ( handles single or multiple tool outputs ).  Make sure to pass the correct hid.
86        # The output datasets from the tool should be in the same order as the testdef.outputs.
87        data_list = None
88        while data_list is None:
89            data_list = self.get_history_as_data_list()
90            if job_finish_by_output_count and len( testdef.outputs ) > ( len( data_list ) - job_finish_by_output_count ):
91                data_list = None
92        self.assertTrue( data_list )
93        elem_index = 0 - len( testdef.outputs )
94        for output_tuple in testdef.outputs:
95            name, outfile, attributes = output_tuple
96            # Get the correct hid
97            elem = data_list[ elem_index ]
98            self.assertTrue( elem is not None )
99            elem_hid = elem.get( 'hid' )
100            elem_index += 1
101            self.verify_dataset_correctness( outfile, hid=elem_hid, maxseconds=testdef.maxseconds, attributes=attributes )
102        self.delete_history( id=self.security.encode_id( latest_history.id ) )
103
104    def __expand_grouping( self, tool_inputs, declared_inputs, prefix='' ):
105        expanded_inputs = {}
106        for key, value in tool_inputs.items():
107            if isinstance( value, grouping.Conditional ):
108                if prefix:
109                    new_prefix = "%s|%s" % ( prefix, value.name )
110                else:
111                    new_prefix = value.name
112                for i, case in enumerate( value.cases ):
113                    if declared_inputs[ value.test_param.name ] == case.value:
114                        if isinstance(case.value, str):
115                            expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value.split( "," )
116                        else:
117                            expanded_inputs[ "%s|%s" % ( new_prefix, value.test_param.name ) ] = case.value
118                        for input_name, input_value in case.inputs.items():
119                            expanded_inputs.update( self.__expand_grouping( { input_name:input_value }, declared_inputs, prefix = new_prefix ) )
120            elif isinstance( value, grouping.Repeat ):
121                for repeat_index in xrange( 0, 1 ): #need to allow for and figure out how many repeats we have
122                    for r_name, r_value in value.inputs.iteritems():
123                            new_prefix = "%s_%d" % ( value.name, repeat_index )
124                            if prefix:
125                                new_prefix = "%s|%s" % ( prefix, new_prefix )
126                            expanded_inputs.update( self.__expand_grouping( { new_prefix : r_value }, declared_inputs, prefix = new_prefix ) )
127            elif isinstance(declared_inputs[value.name], str):
128                if prefix:
129                    expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name].split(",")
130                else:
131                    expanded_inputs[value.name] = declared_inputs[value.name].split(",")
132            else:
133                if prefix:
134                    expanded_inputs["%s|%s" % ( prefix, value.name ) ] = declared_inputs[value.name]
135                else:
136                    expanded_inputs[value.name] = declared_inputs[value.name]
137        return expanded_inputs
138
139def build_tests():
140    """
141    If the module level variable `toolbox` is set, generate `ToolTestCase`
142    classes for all of its tests and put them into this modules globals() so
143    they can be discovered by nose.
144    """
145    if toolbox is None:
146        return
147    # Push all the toolbox tests to module level
148    G = globals()
149    for i, tool_id in enumerate( toolbox.tools_by_id ):
150        tool = toolbox.tools_by_id[ tool_id ]
151        if tool.tests:
152            # Create a new subclass of ToolTestCase dynamically adding methods
153            # names test_tool_XXX that run each test defined in the tool.
154            n = "TestForTool_" + tool.id.replace( ' ', '_' )
155            s = ( ToolTestCase, )
156            d = dict()
157            for j, testdef in enumerate( tool.tests ):
158                def make_test_method( td ):
159                    def test_tool( self ):
160                        self.do_it( td )
161                    return test_tool
162                m = make_test_method( testdef )
163                m.__doc__ = "%s ( %s ) > %s" % ( tool.name, tool.id, testdef.name )
164                d['test_tool_%06d' % j] = m
165            G[ n ] = new.classobj( n, s, d )
166
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。