Workflow '%s' created.
" ## % ( workflow_name, web.url_for( action='editor', id=trans.security.encode_id(stored.id) ) ) ) @web.expose def run( self, trans, id, check_user=True, **kwargs ): stored = self.get_stored_workflow( trans, id, check_ownership=False ) if check_user: user = trans.get_user() if stored.user != user: if trans.sa_session.query( model.StoredWorkflowUserShareAssociation ) \ .filter_by( user=user, stored_workflow=stored ).count() == 0: error( "Workflow is not owned by or shared with current user" ) # Get the latest revision workflow = stored.latest_workflow # It is possible for a workflow to have 0 steps if len( workflow.steps ) == 0: error( "Workflow cannot be run because it does not have any steps" ) #workflow = Workflow.from_simple( simplejson.loads( stored.encoded_value ), trans.app ) if workflow.has_cycles: error( "Workflow cannot be run because it contains cycles" ) if workflow.has_errors: error( "Workflow cannot be run because of validation errors in some steps" ) # Build the state for each step errors = {} has_upgrade_messages = False has_errors = False if kwargs: # If kwargs were provided, the states for each step should have # been POSTed for step in workflow.steps: step.upgrade_messages = {} # Connections by input name step.input_connections_by_name = \ dict( ( conn.input_name, conn ) for conn in step.input_connections ) # Extract just the arguments for this step by prefix p = "%s|" % step.id l = len(p) step_args = dict( ( k[l:], v ) for ( k, v ) in kwargs.iteritems() if k.startswith( p ) ) step_errors = None if step.type == 'tool' or step.type is None: module = module_factory.from_workflow_step( trans, step ) # Fix any missing parameters step.upgrade_messages = module.check_and_update_state() if step.upgrade_messages: has_upgrade_messages = True # Any connected input needs to have value DummyDataset (these # are not persisted so we need to do it every time) module.add_dummy_datasets( connections=step.input_connections ) # Get the tool tool = module.tool # Get the state step.state = state = module.state # Get old errors old_errors = state.inputs.pop( "__errors__", {} ) # Update the state step_errors = tool.update_state( trans, tool.inputs, step.state.inputs, step_args, update_only=True, old_errors=old_errors ) else: module = step.module = module_factory.from_workflow_step( trans, step ) state = step.state = module.decode_runtime_state( trans, step_args.pop( "tool_state" ) ) step_errors = module.update_runtime_state( trans, state, step_args ) if step_errors: errors[step.id] = state.inputs["__errors__"] = step_errors if 'run_workflow' in kwargs and not errors: # Run each step, connecting outputs to inputs workflow_invocation = model.WorkflowInvocation() workflow_invocation.workflow = workflow outputs = odict() # Find out if there are any workflow outputs defined, as that influences our actions. use_workflow_outputs = False for step in workflow.steps: if step.type == 'tool' or step.type is None: if step.workflow_outputs: use_workflow_outputs = True break for i, step in enumerate( workflow.steps ): # Execute module job = None if step.type == 'tool' or step.type is None: tool = trans.app.toolbox.tools_by_id[ step.tool_id ] input_values = step.state.inputs # Connect up def callback( input, value, prefixed_name, prefixed_label ): if isinstance( input, DataToolParameter ): if prefixed_name in step.input_connections_by_name: conn = step.input_connections_by_name[ prefixed_name ] return outputs[ conn.output_step.id ][ conn.output_name ] visit_input_values( tool.inputs, step.state.inputs, callback ) # Execute it job, out_data = tool.execute( trans, step.state.inputs ) outputs[ step.id ] = out_data # Create new PJA associations with the created job, to be run on completion. if use_workflow_outputs: # We're using outputs. Check the step for outputs to be displayed. Create PJAs to hide the rest upon completion. step_outputs = [s.output_name for s in step.workflow_outputs] for output in tool.outputs.keys(): if output not in step_outputs: # Create a PJA for hiding this output. n_pja = PostJobAction('HideDatasetAction', step, output, {}) else: # Remove any HideDatasetActions, step is flagged for output. for pja in step.post_job_actions: if pja.action_type == "HideDatasetAction" and pja.output_name == output: step.post_job_actions.remove(pja) trans.sa_session.delete(pja) for pja in step.post_job_actions: if pja.action_type in ActionBox.immediate_actions: ActionBox.execute(trans.app, trans.sa_session, pja, job) else: job.add_post_job_action(pja) else: job, out_data = step.module.execute( trans, step.state ) outputs[ step.id ] = out_data # Record invocation workflow_invocation_step = model.WorkflowInvocationStep() workflow_invocation_step.workflow_invocation = workflow_invocation workflow_invocation_step.workflow_step = step workflow_invocation_step.job = job # All jobs ran sucessfully, so we can save now trans.sa_session.add( workflow_invocation ) trans.sa_session.flush() return trans.fill_template( "workflow/run_complete.mako", workflow=stored, outputs=outputs ) else: # Prepare each step for step in workflow.steps: step.upgrade_messages = {} # Contruct modules if step.type == 'tool' or step.type is None: # Restore the tool state for the step step.module = module_factory.from_workflow_step( trans, step ) # Fix any missing parameters step.upgrade_messages = step.module.check_and_update_state() if step.upgrade_messages: has_upgrade_messages = True # Any connected input needs to have value DummyDataset (these # are not persisted so we need to do it every time) step.module.add_dummy_datasets( connections=step.input_connections ) # Store state with the step step.state = step.module.state # Error dict if step.tool_errors: has_errors = True errors[step.id] = step.tool_errors else: ## Non-tool specific stuff? step.module = module_factory.from_workflow_step( trans, step ) step.state = step.module.get_runtime_state() # Connections by input name step.input_connections_by_name = dict( ( conn.input_name, conn ) for conn in step.input_connections ) # Render the form return trans.fill_template( "workflow/run.mako", steps=workflow.steps, workflow=stored, has_upgrade_messages=has_upgrade_messages, errors=errors, incoming=kwargs ) @web.expose def tag_outputs( self, trans, id, check_user=True, **kwargs ): stored = self.get_stored_workflow( trans, id, check_ownership=False ) if check_user: user = trans.get_user() if stored.user != user: if trans.sa_session.query( model.StoredWorkflowUserShareAssociation ) \ .filter_by( user=user, stored_workflow=stored ).count() == 0: error( "Workflow is not owned by or shared with current user" ) # Get the latest revision workflow = stored.latest_workflow # It is possible for a workflow to have 0 steps if len( workflow.steps ) == 0: error( "Workflow cannot be tagged for outputs because it does not have any steps" ) if workflow.has_cycles: error( "Workflow cannot be tagged for outputs because it contains cycles" ) if workflow.has_errors: error( "Workflow cannot be tagged for outputs because of validation errors in some steps" ) # Build the state for each step errors = {} has_upgrade_messages = False has_errors = False if kwargs: # If kwargs were provided, the states for each step should have # been POSTed for step in workflow.steps: if step.type == 'tool': # Extract just the output flags for this step. p = "%s|otag|" % step.id l = len(p) outputs = [k[l:] for ( k, v ) in kwargs.iteritems() if k.startswith( p )] if step.workflow_outputs: for existing_output in step.workflow_outputs: if existing_output.output_name not in outputs: trans.sa_session.delete(existing_output) else: outputs.remove(existing_output.output_name) for outputname in outputs: m = model.WorkflowOutput(workflow_step_id = int(step.id), output_name = outputname) trans.sa_session.add(m) # Prepare each step trans.sa_session.flush() for step in workflow.steps: step.upgrade_messages = {} # Contruct modules if step.type == 'tool' or step.type is None: # Restore the tool state for the step step.module = module_factory.from_workflow_step( trans, step ) # Fix any missing parameters step.upgrade_messages = step.module.check_and_update_state() if step.upgrade_messages: has_upgrade_messages = True # Any connected input needs to have value DummyDataset (these # are not persisted so we need to do it every time) step.module.add_dummy_datasets( connections=step.input_connections ) # Store state with the step step.state = step.module.state # Error dict if step.tool_errors: has_errors = True errors[step.id] = step.tool_errors else: ## Non-tool specific stuff? step.module = module_factory.from_workflow_step( trans, step ) step.state = step.module.get_runtime_state() # Connections by input name step.input_connections_by_name = dict( ( conn.input_name, conn ) for conn in step.input_connections ) # Render the form return trans.fill_template( "workflow/tag_outputs.mako", steps=workflow.steps, workflow=stored, has_upgrade_messages=has_upgrade_messages, errors=errors, incoming=kwargs ) @web.expose def configure_menu( self, trans, workflow_ids=None ): user = trans.get_user() if trans.request.method == "POST": if workflow_ids is None: workflow_ids = [] elif type( workflow_ids ) != list: workflow_ids = [ workflow_ids ] sess = trans.sa_session # This explicit remove seems like a hack, need to figure out # how to make the association do it automatically. for m in user.stored_workflow_menu_entries: sess.delete( m ) user.stored_workflow_menu_entries = [] q = sess.query( model.StoredWorkflow ) # To ensure id list is unique seen_workflow_ids = set() for id in workflow_ids: if id in seen_workflow_ids: continue else: seen_workflow_ids.add( id ) m = model.StoredWorkflowMenuEntry() m.stored_workflow = q.get( id ) user.stored_workflow_menu_entries.append( m ) sess.flush() return trans.show_message( "Menu updated", refresh_frames=['tools'] ) else: user = trans.get_user() ids_in_menu = set( [ x.stored_workflow_id for x in user.stored_workflow_menu_entries ] ) workflows = trans.sa_session.query( model.StoredWorkflow ) \ .filter_by( user=user, deleted=False ) \ .order_by( desc( model.StoredWorkflow.table.c.update_time ) ) \ .all() shared_by_others = trans.sa_session \ .query( model.StoredWorkflowUserShareAssociation ) \ .filter_by( user=user ) \ .filter( model.StoredWorkflow.deleted == False ) \ .all() return trans.fill_template( "workflow/configure_menu.mako", workflows=workflows, shared_by_others=shared_by_others, ids_in_menu=ids_in_menu ) def _workflow_to_dict( self, trans, stored ): """ Converts a workflow to a dict of attributes suitable for exporting. """ workflow = stored.latest_workflow workflow_annotation = self.get_item_annotation_obj( trans.sa_session, trans.user, stored ) annotation_str = "" if workflow_annotation: annotation_str = workflow_annotation.annotation # Pack workflow data into a dictionary and return data = {} data['a_galaxy_workflow'] = 'true' # Placeholder for identifying galaxy workflow data['format-version'] = "0.1" data['name'] = workflow.name data['annotation'] = annotation_str data['steps'] = {} # For each step, rebuild the form and encode the state for step in workflow.steps: # Load from database representation module = module_factory.from_workflow_step( trans, step ) # Get user annotation. step_annotation = self.get_item_annotation_obj(trans.sa_session, trans.user, step ) annotation_str = "" if step_annotation: annotation_str = step_annotation.annotation # Step info step_dict = { 'id': step.order_index, 'type': module.type, 'tool_id': module.get_tool_id(), 'tool_version' : step.tool_version, 'name': module.get_name(), 'tool_state': module.get_state( secure=False ), 'tool_errors': module.get_errors(), ## 'data_inputs': module.get_data_inputs(), ## 'data_outputs': module.get_data_outputs(), 'annotation' : annotation_str } # Data inputs step_dict['inputs'] = [] if module.type == "data_input": # Get input dataset name; default to 'Input Dataset' name = module.state.get( 'name', 'Input Dataset') step_dict['inputs'].append( { "name" : name, "description" : annotation_str } ) else: # Step is a tool and may have runtime inputs. for name, val in module.state.inputs.items(): input_type = type( val ) if input_type == RuntimeValue: step['inputs'].append( { "name" : name, "description" : "runtime parameter for tool %s" % module.get_name() } ) elif input_type == dict: # Input type is described by a dict, e.g. indexed parameters. for partname, partval in val.items(): if type( partval ) == RuntimeValue: step_dict['inputs'].append( { "name" : name, "description" : "runtime parameter for tool %s" % module.get_name() } ) # User outputs step_dict['user_outputs'] = [] """ module_outputs = module.get_data_outputs() step_outputs = trans.sa_session.query( WorkflowOutput ).filter( step=step ) for output in step_outputs: name = output.output_name annotation = "" for module_output in module_outputs: if module_output.get( 'name', None ) == name: output_type = module_output.get( 'extension', '' ) break data['outputs'][name] = { 'name' : name, 'annotation' : annotation, 'type' : output_type } """ # All step outputs step_dict['outputs'] = [] if type( module ) is ToolModule: for output in module.get_data_outputs(): step_dict['outputs'].append( { 'name' : output['name'], 'type' : output['extensions'][0] } ) # Connections input_connections = step.input_connections if step.type is None or step.type == 'tool': # Determine full (prefixed) names of valid input datasets data_input_names = {} def callback( input, value, prefixed_name, prefixed_label ): if isinstance( input, DataToolParameter ): data_input_names[ prefixed_name ] = True visit_input_values( module.tool.inputs, module.state.inputs, callback ) # Filter # FIXME: this removes connection without displaying a message currently! input_connections = [ conn for conn in input_connections if conn.input_name in data_input_names ] # Encode input connections as dictionary input_conn_dict = {} for conn in input_connections: input_conn_dict[ conn.input_name ] = \ dict( id=conn.output_step.order_index, output_name=conn.output_name ) step_dict['input_connections'] = input_conn_dict # Position step_dict['position'] = step.position # Add to return value data['steps'][step.order_index] = step_dict return data def _workflow_from_dict( self, trans, data, source=None ): """ Creates a workflow from a dict. Created workflow is stored in the database and returned. """ # Put parameters in workflow mode trans.workflow_building_mode = True # Create new workflow from incoming dict workflow = model.Workflow() # If there's a source, put it in the workflow name. if source: name = "%s (imported from %s)" % ( data['name'], source ) else: name = data['name'] workflow.name = name # Assume no errors until we find a step that has some workflow.has_errors = False # Create each step steps = [] # The editor will provide ids for each step that we don't need to save, # but do need to use to make connections steps_by_external_id = {} # First pass to build step objects and populate basic values for key, step_dict in data['steps'].iteritems(): # Create the model class for the step step = model.WorkflowStep() steps.append( step ) steps_by_external_id[ step_dict['id' ] ] = step # FIXME: Position should be handled inside module step.position = step_dict['position'] module = module_factory.from_dict( trans, step_dict, secure=False ) module.save_to_step( step ) if step.tool_errors: workflow.has_errors = True # Stick this in the step temporarily step.temp_input_connections = step_dict['input_connections'] # Save step annotation. annotation = step_dict[ 'annotation' ] if annotation: annotation = sanitize_html( annotation, 'utf-8', 'text/html' ) self.add_item_annotation( trans.sa_session, trans.get_user(), step, annotation ) # Second pass to deal with connections between steps for step in steps: # Input connections for input_name, conn_dict in step.temp_input_connections.iteritems(): if conn_dict: conn = model.WorkflowStepConnection() conn.input_step = step conn.input_name = input_name conn.output_name = conn_dict['output_name'] conn.output_step = steps_by_external_id[ conn_dict['id'] ] del step.temp_input_connections # Order the steps if possible attach_ordered_steps( workflow, steps ) # Connect up stored = model.StoredWorkflow() stored.name = workflow.name workflow.stored_workflow = stored stored.latest_workflow = workflow stored.user = trans.user # Persist trans.sa_session.add( stored ) trans.sa_session.flush() return stored ## ---- Utility methods ------------------------------------------------------- def attach_ordered_steps( workflow, steps ): ordered_steps = order_workflow_steps( steps ) if ordered_steps: workflow.has_cycles = False for i, step in enumerate( ordered_steps ): step.order_index = i workflow.steps.append( step ) else: workflow.has_cycles = True workflow.steps = steps def edgelist_for_workflow_steps( steps ): """ Create a list of tuples representing edges between `WorkflowSteps` based on associated `WorkflowStepConnection`s """ edges = [] steps_to_index = dict( ( step, i ) for i, step in enumerate( steps ) ) for step in steps: edges.append( ( steps_to_index[step], steps_to_index[step] ) ) for conn in step.input_connections: edges.append( ( steps_to_index[conn.output_step], steps_to_index[conn.input_step] ) ) return edges def order_workflow_steps( steps ): """ Perform topological sort of the steps, return ordered or None """ try: edges = edgelist_for_workflow_steps( steps ) node_order = topsort( edges ) return [ steps[i] for i in node_order ] except CycleError: return None def order_workflow_steps_with_levels( steps ): try: return topsort_levels( edgelist_for_workflow_steps( steps ) ) except CycleError: return None class FakeJob( object ): """ Fake job object for datasets that have no creating_job_associations, they will be treated as "input" datasets. """ def __init__( self, dataset ): self.is_fake = True self.id = "fake_%s" % dataset.id def get_job_dict( trans ): """ Return a dictionary of Job -> [ Dataset ] mappings, for all finished active Datasets in the current history and the jobs that created them. """ history = trans.get_history() # Get the jobs that created the datasets warnings = set() jobs = odict() for dataset in history.active_datasets: # FIXME: Create "Dataset.is_finished" if dataset.state in ( 'new', 'running', 'queued' ): warnings.add( "Some datasets still queued or running were ignored" ) continue #if this hda was copied from another, we need to find the job that created the origial hda job_hda = dataset while job_hda.copied_from_history_dataset_association: job_hda = job_hda.copied_from_history_dataset_association if not job_hda.creating_job_associations: jobs[ FakeJob( dataset ) ] = [ ( None, dataset ) ] for assoc in job_hda.creating_job_associations: job = assoc.job if job in jobs: jobs[ job ].append( ( assoc.name, dataset ) ) else: jobs[ job ] = [ ( assoc.name, dataset ) ] return jobs, warnings def cleanup_param_values( inputs, values ): """ Remove 'Data' values from `param_values`, along with metadata cruft, but track the associations. """ associations = [] names_to_clean = [] # dbkey is pushed in by the framework if 'dbkey' in values: del values['dbkey'] root_values = values # Recursively clean data inputs and dynamic selects def cleanup( prefix, inputs, values ): for key, input in inputs.items(): if isinstance( input, ( SelectToolParameter, DrillDownSelectToolParameter ) ): if input.is_dynamic and not isinstance( values[key], UnvalidatedValue ): values[key] = UnvalidatedValue( values[key] ) if isinstance( input, DataToolParameter ): tmp = values[key] values[key] = None # HACK: Nested associations are not yet working, but we # still need to clean them up so we can serialize # if not( prefix ): if tmp: #this is false for a non-set optional dataset associations.append( ( tmp.hid, prefix + key ) ) # Cleanup the other deprecated crap associated with datasets # as well. Worse, for nested datasets all the metadata is # being pushed into the root. FIXME: MUST REMOVE SOON key = prefix + key + "_" for k in root_values.keys(): if k.startswith( key ): del root_values[k] elif isinstance( input, Repeat ): group_values = values[key] for i, rep_values in enumerate( group_values ): rep_index = rep_values['__index__'] prefix = "%s_%d|" % ( key, rep_index ) cleanup( prefix, input.inputs, group_values[i] ) elif isinstance( input, Conditional ): group_values = values[input.name] current_case = group_values['__current_case__'] prefix = "%s|" % ( key ) cleanup( prefix, input.cases[current_case].inputs, group_values ) cleanup( "", inputs, values ) return associations