root/galaxy-central/lib/galaxy/tools/actions/__init__.py

リビジョン 2, 21.1 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1from galaxy.util.bunch import Bunch
2from galaxy.util.odict import odict
3from galaxy.tools.parameters import *
4from galaxy.tools.parameters.grouping import *
5from galaxy.util.template import fill_template
6from galaxy.util.none_like import NoneDataset
7from galaxy.web import url_for
8import galaxy.tools
9from types import *
10
11import logging
12log = logging.getLogger( __name__ )
13
14class ToolAction( object ):
15    """
16    The actions to be taken when a tool is run (after parameters have
17    been converted and validated).
18    """
19    def execute( self, tool, trans, incoming={}, set_output_hid=True ):
20        raise TypeError("Abstract method")
21   
22class DefaultToolAction( object ):
23    """Default tool action is to run an external command"""
24   
25    def collect_input_datasets( self, tool, param_values, trans ):
26        """
27        Collect any dataset inputs from incoming. Returns a mapping from
28        parameter name to Dataset instance for each tool parameter that is
29        of the DataToolParameter type.
30        """
31        input_datasets = dict()
32        def visitor( prefix, input, value, parent = None ):
33            def process_dataset( data, formats = None ):
34                if formats is None:
35                    formats = input.formats
36                if data and not isinstance( data.datatype, formats ):
37                    # Need to refresh in case this conversion just took place, i.e. input above in tool performed the same conversion
38                    trans.sa_session.refresh( data )
39                    target_ext, converted_dataset = data.find_conversion_destination( formats, converter_safe = input.converter_safe( param_values, trans ) )
40                    if target_ext:
41                        if converted_dataset:
42                            data = converted_dataset
43                        else:
44                            #run converter here
45                            assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation( parent = data, file_type = target_ext, metadata_safe = False )
46                            new_data = data.datatype.convert_dataset( trans, data, target_ext, return_output = True, visible = False ).values()[0]
47                            new_data.hid = data.hid
48                            new_data.name = data.name
49                            trans.sa_session.add( new_data )
50                            trans.sa_session.flush()
51                            assoc.dataset = new_data
52                            trans.sa_session.add( assoc )
53                            trans.sa_session.flush()
54                            data = new_data
55                current_user_roles = trans.get_current_user_roles()
56                if data and not trans.app.security_agent.can_access_dataset( current_user_roles, data.dataset ):
57                    raise "User does not have permission to use a dataset (%s) provided for input." % data.id
58                return data
59            if isinstance( input, DataToolParameter ):
60                if isinstance( value, list ):
61                    # If there are multiple inputs with the same name, they
62                    # are stored as name1, name2, ...
63                    for i, v in enumerate( value ):
64                        input_datasets[ prefix + input.name + str( i + 1 ) ] = process_dataset( v )
65                        conversions = []
66                        for conversion_name, conversion_extensions, conversion_datatypes in input.conversions:
67                            new_data = process_dataset( input_datasets[ prefix + input.name + str( i + 1 ) ], conversion_datatypes )
68                            if not new_data or isinstance( new_data.datatype, conversion_datatypes ):
69                                input_datasets[ prefix + conversion_name + str( i + 1 ) ] = new_data
70                                conversions.append( ( conversion_name, new_data ) )
71                            else:
72                                raise Exception, 'A path for explicit datatype conversion has not been found: %s --/--> %s' % ( input_datasets[ prefix + input.name + str( i + 1 ) ].extension, conversion_extensions )
73                        if parent:
74                            parent[input.name] = input_datasets[ prefix + input.name + str( i + 1 ) ]
75                            for conversion_name, conversion_data in conversions:
76                                #allow explicit conversion to be stored in job_parameter table
77                                parent[ conversion_name ] = conversion_data.id #a more robust way to determine JSONable value is desired
78                        else:
79                            param_values[input.name][i] = input_datasets[ prefix + input.name + str( i + 1 ) ]
80                            for conversion_name, conversion_data in conversions:
81                                #allow explicit conversion to be stored in job_parameter table
82                                param_values[ conversion_name ][i] = conversion_data.id #a more robust way to determine JSONable value is desired
83                else:
84                    input_datasets[ prefix + input.name ] = process_dataset( value )
85                    conversions = []
86                    for conversion_name, conversion_extensions, conversion_datatypes in input.conversions:
87                        new_data = process_dataset( input_datasets[ prefix + input.name ], conversion_datatypes )
88                        if not new_data or isinstance( new_data.datatype, conversion_datatypes ):
89                            input_datasets[ prefix + conversion_name ] = new_data
90                            conversions.append( ( conversion_name, new_data ) )
91                        else:
92                            raise Exception, 'A path for explicit datatype conversion has not been found: %s --/--> %s' % ( input_datasets[ prefix + input.name ].extension, conversion_extensions )
93                    target_dict = parent
94                    if not target_dict:
95                        target_dict = param_values
96                    target_dict[ input.name ] = input_datasets[ prefix + input.name ]
97                    for conversion_name, conversion_data in conversions:
98                        #allow explicit conversion to be stored in job_parameter table
99                        target_dict[ conversion_name ] = conversion_data.id #a more robust way to determine JSONable value is desired
100        tool.visit_inputs( param_values, visitor )
101        return input_datasets
102
103    def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True ):
104        def make_dict_copy( from_dict ):
105            """
106            Makes a copy of input dictionary from_dict such that all values that are dictionaries
107            result in creation of a new dictionary ( a sort of deepcopy ).  We may need to handle
108            other complex types ( e.g., lists, etc ), but not sure...
109            Yes, we need to handle lists (and now are)...
110            """
111            copy_from_dict = {}
112            for key, value in from_dict.items():
113                if type( value ).__name__ == 'dict':
114                    copy_from_dict[ key ] = make_dict_copy( value )
115                elif isinstance( value, list ):
116                    copy_from_dict[ key ] = make_list_copy( value )
117                else:
118                    copy_from_dict[ key ] = value
119            return copy_from_dict
120        def make_list_copy( from_list ):
121            new_list = []
122            for value in from_list:
123                if isinstance( value, dict ):
124                    new_list.append( make_dict_copy( value ) )
125                elif isinstance( value, list ):
126                    new_list.append( make_list_copy( value ) )
127                else:
128                    new_list.append( value )
129            return new_list
130        def wrap_values( inputs, input_values ):
131            # Wrap tool inputs as necessary
132            for input in inputs.itervalues():
133                if isinstance( input, Repeat ):
134                    for d in input_values[ input.name ]:
135                        wrap_values( input.inputs, d )
136                elif isinstance( input, Conditional ):
137                    values = input_values[ input.name ]
138                    current = values[ "__current_case__" ]
139                    wrap_values( input.cases[current].inputs, values )
140                elif isinstance( input, DataToolParameter ):
141                    input_values[ input.name ] = \
142                        galaxy.tools.DatasetFilenameWrapper( input_values[ input.name ],
143                                                             datatypes_registry = trans.app.datatypes_registry,
144                                                             tool = tool,
145                                                             name = input.name )
146                elif isinstance( input, SelectToolParameter ):
147                    input_values[ input.name ] = galaxy.tools.SelectToolParameterWrapper( input, input_values[ input.name ], tool.app, other_values = incoming )
148                else:
149                    input_values[ input.name ] = galaxy.tools.InputValueWrapper( input, input_values[ input.name ], incoming )
150        out_data = odict()
151        # Collect any input datasets from the incoming parameters
152        inp_data = self.collect_input_datasets( tool, incoming, trans )
153
154        # Deal with input dataset names, 'dbkey' and types
155        input_names = []
156        input_ext = 'data'
157        input_dbkey = incoming.get( "dbkey", "?" )
158        for name, data in inp_data.items():
159            if data:
160                input_names.append( 'data %s' % data.hid )
161                input_ext = data.ext
162            else:
163                data = NoneDataset( datatypes_registry = trans.app.datatypes_registry )
164            if data.dbkey not in [None, '?']:
165                input_dbkey = data.dbkey
166
167        # Collect chromInfo dataset and add as parameters to incoming
168        db_datasets = {}
169        db_dataset = trans.db_dataset_for( input_dbkey )
170        if db_dataset:
171            db_datasets[ "chromInfo" ] = db_dataset
172            incoming[ "chromInfo" ] = db_dataset.file_name
173        else:
174            incoming[ "chromInfo" ] = os.path.join( trans.app.config.tool_data_path, 'shared','ucsc','chrom', "%s.len" % input_dbkey )
175        inp_data.update( db_datasets )
176       
177        # Determine output dataset permission/roles list
178        existing_datasets = [ inp for inp in inp_data.values() if inp ]
179        if existing_datasets:
180            output_permissions = trans.app.security_agent.guess_derived_permissions_for_datasets( existing_datasets )
181        else:
182            # No valid inputs, we will use history defaults
183            output_permissions = trans.app.security_agent.history_get_default_permissions( trans.history )
184        # Build name for output datasets based on tool name and input names
185        if len( input_names ) == 1:
186            on_text = input_names[0]
187        elif len( input_names ) == 2:
188            on_text = '%s and %s' % tuple(input_names[0:2])
189        elif len( input_names ) == 3:
190            on_text = '%s, %s, and %s' % tuple(input_names[0:3])
191        elif len( input_names ) > 3:
192            on_text = '%s, %s, and others' % tuple(input_names[0:2])
193        else:
194            on_text = ""
195        # Add the dbkey to the incoming parameters
196        incoming[ "dbkey" ] = input_dbkey
197        params = None #wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed
198        # Keep track of parent / child relationships, we'll create all the
199        # datasets first, then create the associations
200        parent_to_child_pairs = []
201        child_dataset_names = set()
202        for name, output in tool.outputs.items():
203            for filter in output.filters:
204                try:
205                    if not eval( filter.text, globals(), incoming ):
206                        break #do not create this dataset
207                except Exception, e:
208                    log.debug( 'Dataset output filter failed: %s' % e )
209            else: #all filters passed
210                if output.parent:
211                    parent_to_child_pairs.append( ( output.parent, name ) )
212                    child_dataset_names.add( name )
213                ## What is the following hack for? Need to document under what
214                ## conditions can the following occur? (james@bx.psu.edu)
215                # HACK: the output data has already been created
216                #      this happens i.e. as a result of the async controller
217                if name in incoming:
218                    dataid = incoming[name]
219                    data = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( dataid )
220                    assert data != None
221                    out_data[name] = data
222                else:
223                    # the type should match the input
224                    ext = output.format
225                    if ext == "input":
226                        ext = input_ext
227                    #process change_format tags
228                    if output.change_format:
229                        if params is None:
230                            params = make_dict_copy( incoming )
231                            wrap_values( tool.inputs, params )
232                        for change_elem in output.change_format:
233                            for when_elem in change_elem.findall( 'when' ):
234                                check = when_elem.get( 'input', None )
235                                if check is not None:
236                                    try:
237                                        if '$' not in check:
238                                            #allow a simple name or more complex specifications
239                                            check = '${%s}' % check
240                                        if str( fill_template( check, context = params ) ) == when_elem.get( 'value', None ):
241                                            ext = when_elem.get( 'format', ext )
242                                    except: #bad tag input value; possibly referencing a param within a different conditional when block or other nonexistent grouping construct
243                                        continue
244                                else:
245                                    check = when_elem.get( 'input_dataset', None )
246                                    if check is not None:
247                                        check = inp_data.get( check, None )
248                                        if check is not None:
249                                            if str( getattr( check, when_elem.get( 'attribute' ) ) ) == when_elem.get( 'value', None ):
250                                                ext = when_elem.get( 'format', ext )
251                    data = trans.app.model.HistoryDatasetAssociation( extension=ext, create_dataset=True, sa_session=trans.sa_session )
252                    # Commit the dataset immediately so it gets database assigned unique id
253                    trans.sa_session.add( data )
254                    trans.sa_session.flush()
255                    trans.app.security_agent.set_all_dataset_permissions( data.dataset, output_permissions )
256                # Create an empty file immediately
257                open( data.file_name, "w" ).close()
258                # Fix permissions
259                util.umask_fix_perms( data.file_name, trans.app.config.umask, 0666 )
260                # This may not be neccesary with the new parent/child associations
261                data.designation = name
262                # Copy metadata from one of the inputs if requested.
263                if output.metadata_source:
264                    data.init_meta( copy_from=inp_data[output.metadata_source] )
265                else:
266                    data.init_meta()
267                # Take dbkey from LAST input
268                data.dbkey = str(input_dbkey)
269                # Set state
270                # FIXME: shouldn't this be NEW until the job runner changes it?
271                data.state = data.states.QUEUED
272                data.blurb = "queued"
273                # Set output label
274                if output.label:
275                    if params is None:
276                        params = make_dict_copy( incoming )
277                        # wrapping the params allows the tool config to contain things like
278                        # <outputs>
279                        #     <data format="input" name="output" label="Blat on ${<input_param>.name}" />
280                        # </outputs>
281                        wrap_values( tool.inputs, params )
282                    #tool (only needing to be set once) and on_string (set differently for each label) are overwritten for each output dataset label being determined
283                    params['tool'] = tool
284                    params['on_string'] = on_text
285                    data.name = fill_template( output.label, context=params )
286                else:
287                    data.name = tool.name
288                    if on_text:
289                        data.name += ( " on " + on_text )
290                # Store output
291                out_data[ name ] = data
292                if output.actions:
293                    #Apply pre-job tool-output-dataset actions; e.g. setting metadata, changing format
294                    output_action_params = dict( out_data )
295                    output_action_params.update( incoming )
296                    output.actions.apply_action( data, output_action_params )
297                # Store all changes to database
298                trans.sa_session.flush()
299        # Add all the top-level (non-child) datasets to the history
300        for name in out_data.keys():
301            if name not in child_dataset_names and name not in incoming: #don't add children; or already existing datasets, i.e. async created
302                data = out_data[ name ]
303                trans.history.add_dataset( data, set_hid = set_output_hid )
304                trans.sa_session.add( data )
305                trans.sa_session.flush()
306        # Add all the children to their parents
307        for parent_name, child_name in parent_to_child_pairs:
308            parent_dataset = out_data[ parent_name ]
309            child_dataset = out_data[ child_name ]
310            parent_dataset.children.append( child_dataset )
311        # Store data after custom code runs
312        trans.sa_session.flush()
313        # Create the job object
314        job = trans.app.model.Job()
315        galaxy_session = trans.get_galaxy_session()
316        # If we're submitting from the API, there won't be a session.
317        if type( galaxy_session ) == trans.model.GalaxySession:
318            job.session_id = galaxy_session.id
319        if trans.user is not None:
320            job.user_id = trans.user.id
321        job.history_id = trans.history.id
322        job.tool_id = tool.id
323        try:
324            # For backward compatibility, some tools may not have versions yet.
325            job.tool_version = tool.version
326        except:
327            job.tool_version = "1.0.0"
328        # FIXME: Don't need all of incoming here, just the defined parameters
329        #        from the tool. We need to deal with tools that pass all post
330        #        parameters to the command as a special case.
331        for name, value in tool.params_to_strings( incoming, trans.app ).iteritems():
332            job.add_parameter( name, value )
333        current_user_roles = trans.get_current_user_roles()
334        for name, dataset in inp_data.iteritems():
335            if dataset:
336                if not trans.app.security_agent.can_access_dataset( current_user_roles, dataset.dataset ):
337                    raise "User does not have permission to use a dataset (%s) provided for input." % data.id
338                job.add_input_dataset( name, dataset )
339            else:
340                job.add_input_dataset( name, None )
341        for name, dataset in out_data.iteritems():
342            job.add_output_dataset( name, dataset )
343        trans.sa_session.add( job )
344        trans.sa_session.flush()
345        # Some tools are not really executable, but jobs are still created for them ( for record keeping ).
346        # Examples include tools that redirect to other applications ( epigraph ).  These special tools must
347        # include something that can be retrieved from the params ( e.g., REDIRECT_URL ) to keep the job
348        # from being queued.
349        if 'REDIRECT_URL' in incoming:
350            # Get the dataset - there should only be 1
351            for name in inp_data.keys():
352                dataset = inp_data[ name ]
353            redirect_url = tool.parse_redirect_url( dataset, incoming )
354            # GALAXY_URL should be include in the tool params to enable the external application
355            # to send back to the current Galaxy instance
356            GALAXY_URL = incoming.get( 'GALAXY_URL', None )
357            assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config."
358            redirect_url += "&GALAXY_URL=%s" % GALAXY_URL
359            # Job should not be queued, so set state to ok
360            job.state = trans.app.model.Job.states.OK
361            job.info = "Redirected to: %s" % redirect_url
362            trans.sa_session.add( job )
363            trans.sa_session.flush()
364            trans.response.send_redirect( url_for( controller='tool_runner', action='redirect', redirect_url=redirect_url ) )
365        else:
366            # Queue the job for execution
367            trans.app.job_queue.put( job.id, tool )
368            trans.log_event( "Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id )
369            return job, out_data
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。