1 | from galaxy.util.bunch import Bunch |
---|
2 | from galaxy.util.odict import odict |
---|
3 | from galaxy.tools.parameters import * |
---|
4 | from galaxy.tools.parameters.grouping import * |
---|
5 | from galaxy.util.template import fill_template |
---|
6 | from galaxy.util.none_like import NoneDataset |
---|
7 | from galaxy.web import url_for |
---|
8 | import galaxy.tools |
---|
9 | from types import * |
---|
10 | |
---|
11 | import logging |
---|
12 | log = logging.getLogger( __name__ ) |
---|
13 | |
---|
14 | class ToolAction( object ): |
---|
15 | """ |
---|
16 | The actions to be taken when a tool is run (after parameters have |
---|
17 | been converted and validated). |
---|
18 | """ |
---|
19 | def execute( self, tool, trans, incoming={}, set_output_hid=True ): |
---|
20 | raise TypeError("Abstract method") |
---|
21 | |
---|
22 | class DefaultToolAction( object ): |
---|
23 | """Default tool action is to run an external command""" |
---|
24 | |
---|
25 | def collect_input_datasets( self, tool, param_values, trans ): |
---|
26 | """ |
---|
27 | Collect any dataset inputs from incoming. Returns a mapping from |
---|
28 | parameter name to Dataset instance for each tool parameter that is |
---|
29 | of the DataToolParameter type. |
---|
30 | """ |
---|
31 | input_datasets = dict() |
---|
32 | def visitor( prefix, input, value, parent = None ): |
---|
33 | def process_dataset( data, formats = None ): |
---|
34 | if formats is None: |
---|
35 | formats = input.formats |
---|
36 | if data and not isinstance( data.datatype, formats ): |
---|
37 | # Need to refresh in case this conversion just took place, i.e. input above in tool performed the same conversion |
---|
38 | trans.sa_session.refresh( data ) |
---|
39 | target_ext, converted_dataset = data.find_conversion_destination( formats, converter_safe = input.converter_safe( param_values, trans ) ) |
---|
40 | if target_ext: |
---|
41 | if converted_dataset: |
---|
42 | data = converted_dataset |
---|
43 | else: |
---|
44 | #run converter here |
---|
45 | assoc = trans.app.model.ImplicitlyConvertedDatasetAssociation( parent = data, file_type = target_ext, metadata_safe = False ) |
---|
46 | new_data = data.datatype.convert_dataset( trans, data, target_ext, return_output = True, visible = False ).values()[0] |
---|
47 | new_data.hid = data.hid |
---|
48 | new_data.name = data.name |
---|
49 | trans.sa_session.add( new_data ) |
---|
50 | trans.sa_session.flush() |
---|
51 | assoc.dataset = new_data |
---|
52 | trans.sa_session.add( assoc ) |
---|
53 | trans.sa_session.flush() |
---|
54 | data = new_data |
---|
55 | current_user_roles = trans.get_current_user_roles() |
---|
56 | if data and not trans.app.security_agent.can_access_dataset( current_user_roles, data.dataset ): |
---|
57 | raise "User does not have permission to use a dataset (%s) provided for input." % data.id |
---|
58 | return data |
---|
59 | if isinstance( input, DataToolParameter ): |
---|
60 | if isinstance( value, list ): |
---|
61 | # If there are multiple inputs with the same name, they |
---|
62 | # are stored as name1, name2, ... |
---|
63 | for i, v in enumerate( value ): |
---|
64 | input_datasets[ prefix + input.name + str( i + 1 ) ] = process_dataset( v ) |
---|
65 | conversions = [] |
---|
66 | for conversion_name, conversion_extensions, conversion_datatypes in input.conversions: |
---|
67 | new_data = process_dataset( input_datasets[ prefix + input.name + str( i + 1 ) ], conversion_datatypes ) |
---|
68 | if not new_data or isinstance( new_data.datatype, conversion_datatypes ): |
---|
69 | input_datasets[ prefix + conversion_name + str( i + 1 ) ] = new_data |
---|
70 | conversions.append( ( conversion_name, new_data ) ) |
---|
71 | else: |
---|
72 | raise Exception, 'A path for explicit datatype conversion has not been found: %s --/--> %s' % ( input_datasets[ prefix + input.name + str( i + 1 ) ].extension, conversion_extensions ) |
---|
73 | if parent: |
---|
74 | parent[input.name] = input_datasets[ prefix + input.name + str( i + 1 ) ] |
---|
75 | for conversion_name, conversion_data in conversions: |
---|
76 | #allow explicit conversion to be stored in job_parameter table |
---|
77 | parent[ conversion_name ] = conversion_data.id #a more robust way to determine JSONable value is desired |
---|
78 | else: |
---|
79 | param_values[input.name][i] = input_datasets[ prefix + input.name + str( i + 1 ) ] |
---|
80 | for conversion_name, conversion_data in conversions: |
---|
81 | #allow explicit conversion to be stored in job_parameter table |
---|
82 | param_values[ conversion_name ][i] = conversion_data.id #a more robust way to determine JSONable value is desired |
---|
83 | else: |
---|
84 | input_datasets[ prefix + input.name ] = process_dataset( value ) |
---|
85 | conversions = [] |
---|
86 | for conversion_name, conversion_extensions, conversion_datatypes in input.conversions: |
---|
87 | new_data = process_dataset( input_datasets[ prefix + input.name ], conversion_datatypes ) |
---|
88 | if not new_data or isinstance( new_data.datatype, conversion_datatypes ): |
---|
89 | input_datasets[ prefix + conversion_name ] = new_data |
---|
90 | conversions.append( ( conversion_name, new_data ) ) |
---|
91 | else: |
---|
92 | raise Exception, 'A path for explicit datatype conversion has not been found: %s --/--> %s' % ( input_datasets[ prefix + input.name ].extension, conversion_extensions ) |
---|
93 | target_dict = parent |
---|
94 | if not target_dict: |
---|
95 | target_dict = param_values |
---|
96 | target_dict[ input.name ] = input_datasets[ prefix + input.name ] |
---|
97 | for conversion_name, conversion_data in conversions: |
---|
98 | #allow explicit conversion to be stored in job_parameter table |
---|
99 | target_dict[ conversion_name ] = conversion_data.id #a more robust way to determine JSONable value is desired |
---|
100 | tool.visit_inputs( param_values, visitor ) |
---|
101 | return input_datasets |
---|
102 | |
---|
103 | def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True ): |
---|
104 | def make_dict_copy( from_dict ): |
---|
105 | """ |
---|
106 | Makes a copy of input dictionary from_dict such that all values that are dictionaries |
---|
107 | result in creation of a new dictionary ( a sort of deepcopy ). We may need to handle |
---|
108 | other complex types ( e.g., lists, etc ), but not sure... |
---|
109 | Yes, we need to handle lists (and now are)... |
---|
110 | """ |
---|
111 | copy_from_dict = {} |
---|
112 | for key, value in from_dict.items(): |
---|
113 | if type( value ).__name__ == 'dict': |
---|
114 | copy_from_dict[ key ] = make_dict_copy( value ) |
---|
115 | elif isinstance( value, list ): |
---|
116 | copy_from_dict[ key ] = make_list_copy( value ) |
---|
117 | else: |
---|
118 | copy_from_dict[ key ] = value |
---|
119 | return copy_from_dict |
---|
120 | def make_list_copy( from_list ): |
---|
121 | new_list = [] |
---|
122 | for value in from_list: |
---|
123 | if isinstance( value, dict ): |
---|
124 | new_list.append( make_dict_copy( value ) ) |
---|
125 | elif isinstance( value, list ): |
---|
126 | new_list.append( make_list_copy( value ) ) |
---|
127 | else: |
---|
128 | new_list.append( value ) |
---|
129 | return new_list |
---|
130 | def wrap_values( inputs, input_values ): |
---|
131 | # Wrap tool inputs as necessary |
---|
132 | for input in inputs.itervalues(): |
---|
133 | if isinstance( input, Repeat ): |
---|
134 | for d in input_values[ input.name ]: |
---|
135 | wrap_values( input.inputs, d ) |
---|
136 | elif isinstance( input, Conditional ): |
---|
137 | values = input_values[ input.name ] |
---|
138 | current = values[ "__current_case__" ] |
---|
139 | wrap_values( input.cases[current].inputs, values ) |
---|
140 | elif isinstance( input, DataToolParameter ): |
---|
141 | input_values[ input.name ] = \ |
---|
142 | galaxy.tools.DatasetFilenameWrapper( input_values[ input.name ], |
---|
143 | datatypes_registry = trans.app.datatypes_registry, |
---|
144 | tool = tool, |
---|
145 | name = input.name ) |
---|
146 | elif isinstance( input, SelectToolParameter ): |
---|
147 | input_values[ input.name ] = galaxy.tools.SelectToolParameterWrapper( input, input_values[ input.name ], tool.app, other_values = incoming ) |
---|
148 | else: |
---|
149 | input_values[ input.name ] = galaxy.tools.InputValueWrapper( input, input_values[ input.name ], incoming ) |
---|
150 | out_data = odict() |
---|
151 | # Collect any input datasets from the incoming parameters |
---|
152 | inp_data = self.collect_input_datasets( tool, incoming, trans ) |
---|
153 | |
---|
154 | # Deal with input dataset names, 'dbkey' and types |
---|
155 | input_names = [] |
---|
156 | input_ext = 'data' |
---|
157 | input_dbkey = incoming.get( "dbkey", "?" ) |
---|
158 | for name, data in inp_data.items(): |
---|
159 | if data: |
---|
160 | input_names.append( 'data %s' % data.hid ) |
---|
161 | input_ext = data.ext |
---|
162 | else: |
---|
163 | data = NoneDataset( datatypes_registry = trans.app.datatypes_registry ) |
---|
164 | if data.dbkey not in [None, '?']: |
---|
165 | input_dbkey = data.dbkey |
---|
166 | |
---|
167 | # Collect chromInfo dataset and add as parameters to incoming |
---|
168 | db_datasets = {} |
---|
169 | db_dataset = trans.db_dataset_for( input_dbkey ) |
---|
170 | if db_dataset: |
---|
171 | db_datasets[ "chromInfo" ] = db_dataset |
---|
172 | incoming[ "chromInfo" ] = db_dataset.file_name |
---|
173 | else: |
---|
174 | incoming[ "chromInfo" ] = os.path.join( trans.app.config.tool_data_path, 'shared','ucsc','chrom', "%s.len" % input_dbkey ) |
---|
175 | inp_data.update( db_datasets ) |
---|
176 | |
---|
177 | # Determine output dataset permission/roles list |
---|
178 | existing_datasets = [ inp for inp in inp_data.values() if inp ] |
---|
179 | if existing_datasets: |
---|
180 | output_permissions = trans.app.security_agent.guess_derived_permissions_for_datasets( existing_datasets ) |
---|
181 | else: |
---|
182 | # No valid inputs, we will use history defaults |
---|
183 | output_permissions = trans.app.security_agent.history_get_default_permissions( trans.history ) |
---|
184 | # Build name for output datasets based on tool name and input names |
---|
185 | if len( input_names ) == 1: |
---|
186 | on_text = input_names[0] |
---|
187 | elif len( input_names ) == 2: |
---|
188 | on_text = '%s and %s' % tuple(input_names[0:2]) |
---|
189 | elif len( input_names ) == 3: |
---|
190 | on_text = '%s, %s, and %s' % tuple(input_names[0:3]) |
---|
191 | elif len( input_names ) > 3: |
---|
192 | on_text = '%s, %s, and others' % tuple(input_names[0:2]) |
---|
193 | else: |
---|
194 | on_text = "" |
---|
195 | # Add the dbkey to the incoming parameters |
---|
196 | incoming[ "dbkey" ] = input_dbkey |
---|
197 | params = None #wrapped params are used by change_format action and by output.label; only perform this wrapping once, as needed |
---|
198 | # Keep track of parent / child relationships, we'll create all the |
---|
199 | # datasets first, then create the associations |
---|
200 | parent_to_child_pairs = [] |
---|
201 | child_dataset_names = set() |
---|
202 | for name, output in tool.outputs.items(): |
---|
203 | for filter in output.filters: |
---|
204 | try: |
---|
205 | if not eval( filter.text, globals(), incoming ): |
---|
206 | break #do not create this dataset |
---|
207 | except Exception, e: |
---|
208 | log.debug( 'Dataset output filter failed: %s' % e ) |
---|
209 | else: #all filters passed |
---|
210 | if output.parent: |
---|
211 | parent_to_child_pairs.append( ( output.parent, name ) ) |
---|
212 | child_dataset_names.add( name ) |
---|
213 | ## What is the following hack for? Need to document under what |
---|
214 | ## conditions can the following occur? (james@bx.psu.edu) |
---|
215 | # HACK: the output data has already been created |
---|
216 | # this happens i.e. as a result of the async controller |
---|
217 | if name in incoming: |
---|
218 | dataid = incoming[name] |
---|
219 | data = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( dataid ) |
---|
220 | assert data != None |
---|
221 | out_data[name] = data |
---|
222 | else: |
---|
223 | # the type should match the input |
---|
224 | ext = output.format |
---|
225 | if ext == "input": |
---|
226 | ext = input_ext |
---|
227 | #process change_format tags |
---|
228 | if output.change_format: |
---|
229 | if params is None: |
---|
230 | params = make_dict_copy( incoming ) |
---|
231 | wrap_values( tool.inputs, params ) |
---|
232 | for change_elem in output.change_format: |
---|
233 | for when_elem in change_elem.findall( 'when' ): |
---|
234 | check = when_elem.get( 'input', None ) |
---|
235 | if check is not None: |
---|
236 | try: |
---|
237 | if '$' not in check: |
---|
238 | #allow a simple name or more complex specifications |
---|
239 | check = '${%s}' % check |
---|
240 | if str( fill_template( check, context = params ) ) == when_elem.get( 'value', None ): |
---|
241 | ext = when_elem.get( 'format', ext ) |
---|
242 | except: #bad tag input value; possibly referencing a param within a different conditional when block or other nonexistent grouping construct |
---|
243 | continue |
---|
244 | else: |
---|
245 | check = when_elem.get( 'input_dataset', None ) |
---|
246 | if check is not None: |
---|
247 | check = inp_data.get( check, None ) |
---|
248 | if check is not None: |
---|
249 | if str( getattr( check, when_elem.get( 'attribute' ) ) ) == when_elem.get( 'value', None ): |
---|
250 | ext = when_elem.get( 'format', ext ) |
---|
251 | data = trans.app.model.HistoryDatasetAssociation( extension=ext, create_dataset=True, sa_session=trans.sa_session ) |
---|
252 | # Commit the dataset immediately so it gets database assigned unique id |
---|
253 | trans.sa_session.add( data ) |
---|
254 | trans.sa_session.flush() |
---|
255 | trans.app.security_agent.set_all_dataset_permissions( data.dataset, output_permissions ) |
---|
256 | # Create an empty file immediately |
---|
257 | open( data.file_name, "w" ).close() |
---|
258 | # Fix permissions |
---|
259 | util.umask_fix_perms( data.file_name, trans.app.config.umask, 0666 ) |
---|
260 | # This may not be neccesary with the new parent/child associations |
---|
261 | data.designation = name |
---|
262 | # Copy metadata from one of the inputs if requested. |
---|
263 | if output.metadata_source: |
---|
264 | data.init_meta( copy_from=inp_data[output.metadata_source] ) |
---|
265 | else: |
---|
266 | data.init_meta() |
---|
267 | # Take dbkey from LAST input |
---|
268 | data.dbkey = str(input_dbkey) |
---|
269 | # Set state |
---|
270 | # FIXME: shouldn't this be NEW until the job runner changes it? |
---|
271 | data.state = data.states.QUEUED |
---|
272 | data.blurb = "queued" |
---|
273 | # Set output label |
---|
274 | if output.label: |
---|
275 | if params is None: |
---|
276 | params = make_dict_copy( incoming ) |
---|
277 | # wrapping the params allows the tool config to contain things like |
---|
278 | # <outputs> |
---|
279 | # <data format="input" name="output" label="Blat on ${<input_param>.name}" /> |
---|
280 | # </outputs> |
---|
281 | wrap_values( tool.inputs, params ) |
---|
282 | #tool (only needing to be set once) and on_string (set differently for each label) are overwritten for each output dataset label being determined |
---|
283 | params['tool'] = tool |
---|
284 | params['on_string'] = on_text |
---|
285 | data.name = fill_template( output.label, context=params ) |
---|
286 | else: |
---|
287 | data.name = tool.name |
---|
288 | if on_text: |
---|
289 | data.name += ( " on " + on_text ) |
---|
290 | # Store output |
---|
291 | out_data[ name ] = data |
---|
292 | if output.actions: |
---|
293 | #Apply pre-job tool-output-dataset actions; e.g. setting metadata, changing format |
---|
294 | output_action_params = dict( out_data ) |
---|
295 | output_action_params.update( incoming ) |
---|
296 | output.actions.apply_action( data, output_action_params ) |
---|
297 | # Store all changes to database |
---|
298 | trans.sa_session.flush() |
---|
299 | # Add all the top-level (non-child) datasets to the history |
---|
300 | for name in out_data.keys(): |
---|
301 | if name not in child_dataset_names and name not in incoming: #don't add children; or already existing datasets, i.e. async created |
---|
302 | data = out_data[ name ] |
---|
303 | trans.history.add_dataset( data, set_hid = set_output_hid ) |
---|
304 | trans.sa_session.add( data ) |
---|
305 | trans.sa_session.flush() |
---|
306 | # Add all the children to their parents |
---|
307 | for parent_name, child_name in parent_to_child_pairs: |
---|
308 | parent_dataset = out_data[ parent_name ] |
---|
309 | child_dataset = out_data[ child_name ] |
---|
310 | parent_dataset.children.append( child_dataset ) |
---|
311 | # Store data after custom code runs |
---|
312 | trans.sa_session.flush() |
---|
313 | # Create the job object |
---|
314 | job = trans.app.model.Job() |
---|
315 | galaxy_session = trans.get_galaxy_session() |
---|
316 | # If we're submitting from the API, there won't be a session. |
---|
317 | if type( galaxy_session ) == trans.model.GalaxySession: |
---|
318 | job.session_id = galaxy_session.id |
---|
319 | if trans.user is not None: |
---|
320 | job.user_id = trans.user.id |
---|
321 | job.history_id = trans.history.id |
---|
322 | job.tool_id = tool.id |
---|
323 | try: |
---|
324 | # For backward compatibility, some tools may not have versions yet. |
---|
325 | job.tool_version = tool.version |
---|
326 | except: |
---|
327 | job.tool_version = "1.0.0" |
---|
328 | # FIXME: Don't need all of incoming here, just the defined parameters |
---|
329 | # from the tool. We need to deal with tools that pass all post |
---|
330 | # parameters to the command as a special case. |
---|
331 | for name, value in tool.params_to_strings( incoming, trans.app ).iteritems(): |
---|
332 | job.add_parameter( name, value ) |
---|
333 | current_user_roles = trans.get_current_user_roles() |
---|
334 | for name, dataset in inp_data.iteritems(): |
---|
335 | if dataset: |
---|
336 | if not trans.app.security_agent.can_access_dataset( current_user_roles, dataset.dataset ): |
---|
337 | raise "User does not have permission to use a dataset (%s) provided for input." % data.id |
---|
338 | job.add_input_dataset( name, dataset ) |
---|
339 | else: |
---|
340 | job.add_input_dataset( name, None ) |
---|
341 | for name, dataset in out_data.iteritems(): |
---|
342 | job.add_output_dataset( name, dataset ) |
---|
343 | trans.sa_session.add( job ) |
---|
344 | trans.sa_session.flush() |
---|
345 | # Some tools are not really executable, but jobs are still created for them ( for record keeping ). |
---|
346 | # Examples include tools that redirect to other applications ( epigraph ). These special tools must |
---|
347 | # include something that can be retrieved from the params ( e.g., REDIRECT_URL ) to keep the job |
---|
348 | # from being queued. |
---|
349 | if 'REDIRECT_URL' in incoming: |
---|
350 | # Get the dataset - there should only be 1 |
---|
351 | for name in inp_data.keys(): |
---|
352 | dataset = inp_data[ name ] |
---|
353 | redirect_url = tool.parse_redirect_url( dataset, incoming ) |
---|
354 | # GALAXY_URL should be include in the tool params to enable the external application |
---|
355 | # to send back to the current Galaxy instance |
---|
356 | GALAXY_URL = incoming.get( 'GALAXY_URL', None ) |
---|
357 | assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config." |
---|
358 | redirect_url += "&GALAXY_URL=%s" % GALAXY_URL |
---|
359 | # Job should not be queued, so set state to ok |
---|
360 | job.state = trans.app.model.Job.states.OK |
---|
361 | job.info = "Redirected to: %s" % redirect_url |
---|
362 | trans.sa_session.add( job ) |
---|
363 | trans.sa_session.flush() |
---|
364 | trans.response.send_redirect( url_for( controller='tool_runner', action='redirect', redirect_url=redirect_url ) ) |
---|
365 | else: |
---|
366 | # Queue the job for execution |
---|
367 | trans.app.job_queue.put( job.id, tool ) |
---|
368 | trans.log_event( "Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id ) |
---|
369 | return job, out_data |
---|