""" Classes encapsulating galaxy tools and tool configuration. """ import pkg_resources; pkg_resources.require( "simplejson" ) import logging, os, string, sys, tempfile, glob, shutil, types, urllib import simplejson import binascii from UserDict import DictMixin from galaxy.util.odict import odict from galaxy.util.bunch import Bunch from galaxy.util.template import fill_template from galaxy import util, jobs, model from elementtree import ElementTree from parameters import * from parameters.grouping import * from parameters.output import ToolOutputActionGroup from parameters.validation import LateValidationError from parameters.input_translation import ToolInputTranslator from galaxy.util.expressions import ExpressionContext from galaxy.tools.test import ToolTestBuilder from galaxy.tools.actions import DefaultToolAction from galaxy.tools.deps import DependencyManager from galaxy.model import directory_hash_id from galaxy.util.none_like import NoneDataset from galaxy.datatypes import sniff from cgi import FieldStorage from galaxy.util.hash_util import * log = logging.getLogger( __name__ ) class ToolNotFoundException( Exception ): pass class ToolBox( object ): """ Container for a collection of tools """ def __init__( self, config_filename, tool_root_dir, app ): """ Create a toolbox from the config file names by `config_filename`, using `tool_root_directory` as the base directory for finding individual tool config files. """ self.tools_by_id = {} self.workflows_by_id = {} self.tool_panel = odict() self.tool_root_dir = tool_root_dir self.app = app self.init_dependency_manager() try: self.init_tools( config_filename ) except: log.exception( "ToolBox error reading %s", config_filename ) def init_tools( self, config_filename ): """ Read the configuration file and load each tool. The following tags are currently supported: # tools outside sections """ def load_tool( elem, panel_dict ): try: path = elem.get( "file" ) tool = self.load_tool( os.path.join( self.tool_root_dir, path ) ) self.tools_by_id[ tool.id ] = tool key = 'tool_' + tool.id panel_dict[ key ] = tool log.debug( "Loaded tool: %s %s" % ( tool.id, tool.version ) ) except: log.exception( "error reading tool from path: %s" % path ) def load_workflow( elem, panel_dict ): try: # TODO: should id be encoded? workflow_id = elem.get( 'id' ) workflow = self.load_workflow( workflow_id ) self.workflows_by_id[ workflow_id ] = workflow key = 'workflow_' + workflow_id panel_dict[ key ] = workflow log.debug( "Loaded workflow: %s %s" % ( workflow_id, workflow.name ) ) except: log.exception( "error loading workflow: %s" % workflow_id ) def load_label( elem, panel_dict ): label = ToolSectionLabel( elem ) key = 'label_' + label.id panel_dict[ key ] = label def load_section( elem, panel_dict ): section = ToolSection( elem ) log.debug( "Loading section: %s" % section.name ) for section_elem in elem: if section_elem.tag == 'tool': load_tool( section_elem, section.elems ) elif section_elem.tag == 'workflow': load_workflow( section_elem, section.elems ) elif section_elem.tag == 'label': load_label( section_elem, section.elems ) key = 'section_' + section.id panel_dict[ key ] = section log.info("parsing the tool configuration") tree = util.parse_xml( config_filename ) root = tree.getroot() for elem in root: if elem.tag == 'tool': load_tool( elem, self.tool_panel ) elif elem.tag == 'workflow': load_workflow( elem, self.tool_panel ) elif elem.tag == 'section' : load_section( elem, self.tool_panel ) elif elem.tag == 'label': load_label( elem, self.tool_panel ) def load_tool( self, config_file ): """ Load a single tool from the file named by `config_file` and return an instance of `Tool`. """ # Parse XML configuration file and get the root element tree = util.parse_xml( config_file ) root = tree.getroot() # Allow specifying a different tool subclass to instantiate if root.find( "type" ) is not None: type_elem = root.find( "type" ) module = type_elem.get( 'module', 'galaxy.tools' ) cls = type_elem.get( 'class' ) mod = __import__( module, globals(), locals(), [cls]) ToolClass = getattr( mod, cls ) elif root.get( 'tool_type', None ) is not None: ToolClass = tool_types.get( root.get( 'tool_type' ) ) else: ToolClass = Tool return ToolClass( config_file, root, self.app ) def reload( self, tool_id ): """ Attempt to reload the tool identified by 'tool_id', if successful replace the old tool. """ if tool_id not in self.tools_by_id: raise ToolNotFoundException( "No tool with id %s" % tool_id ) old_tool = self.tools_by_id[ tool_id ] new_tool = self.load_tool( old_tool.config_file ) # Replace old_tool with new_tool in self.tool_panel tool_key = 'tool_' + tool_id for key, val in self.tool_panel.items(): if key == tool_key: self.tool_panel[ key ] = new_tool break elif key.startswith( 'section' ): section = val for section_key, section_val in section.elems.items(): if section_key == tool_key: self.tool_panel[ key ].elems[ section_key ] = new_tool break self.tools_by_id[ tool_id ] = new_tool log.debug( "Reloaded tool %s %s" %( old_tool.id, old_tool.version ) ) def load_workflow( self, workflow_id ): """ Return an instance of 'Workflow' identified by `id`, which is encoded in the tool panel. """ id = self.app.security.decode_id( workflow_id ) stored = self.app.model.context.query( self.app.model.StoredWorkflow ).get( id ) return stored.latest_workflow def init_dependency_manager( self ): self.dependency_manager = None if self.app.config.use_tool_dependencies: self.dependency_manager = DependencyManager( [ self.app.config.tool_dependency_dir ] ) class ToolSection( object ): """ A group of tools with similar type/purpose that will be displayed as a group in the user interface. """ def __init__( self, elem ): self.name = elem.get( "name" ) self.id = elem.get( "id" ) self.version = elem.get( "version" ) self.elems = odict() class ToolSectionLabel( object ): """ A label for a set of tools that can be displayed above groups of tools and sections in the user interface """ def __init__( self, elem ): self.text = elem.get( "text" ) self.id = elem.get( "id" ) self.version = elem.get( "version" ) class DefaultToolState( object ): """ Keeps track of the state of a users interaction with a tool between requests. The default tool state keeps track of the current page (for multipage "wizard" tools) and the values of all parameters. """ def __init__( self ): self.page = 0 self.inputs = None def encode( self, tool, app, secure=True ): """ Convert the data to a string """ # Convert parameters to a dictionary of strings, and save curent # page in that dict value = params_to_strings( tool.inputs, self.inputs, app ) value["__page__"] = self.page value = simplejson.dumps( value ) # Make it secure if secure: a = hmac_new( app.config.tool_secret, value ) b = binascii.hexlify( value ) return "%s:%s" % ( a, b ) else: return value def decode( self, value, tool, app, secure=True ): """ Restore the state from a string """ if secure: # Extract and verify hash a, b = value.split( ":" ) value = binascii.unhexlify( b ) test = hmac_new( app.config.tool_secret, value ) assert a == test # Restore from string values = json_fix( simplejson.loads( value ) ) self.page = values.pop( "__page__" ) self.inputs = params_from_strings( tool.inputs, values, app, ignore_errors=True ) class ToolOutput( object ): """ Represents an output datasets produced by a tool. For backward compatibility this behaves as if it were the tuple: (format, metadata_source, parent) """ def __init__( self, name, format=None, metadata_source=None, parent=None, label=None, filters = None, actions = None ): self.name = name self.format = format self.metadata_source = metadata_source self.parent = parent self.label = label self.filters = filters or [] self.actions = actions # Tuple emulation def __len__( self ): return 3 def __getitem__( self, index ): if index == 0: return self.format elif index == 1: return self.metadata_source elif index == 2: return self.parent else: raise IndexError( index ) def __iter__( self ): return iter( ( self.format, self.metadata_source, self.parent ) ) class ToolRequirement( object ): """ Represents an external requirement that must be available for the tool to run (for example, a program, package, or library). Requirements can optionally assert a specific version """ def __init__( self ): self.name = None self.type = None self.version = None class Tool: """ Represents a computational tool that can be executed through Galaxy. """ tool_type = 'default' def __init__( self, config_file, root, app ): """ Load a tool from the config named by `config_file` """ # Determine the full path of the directory where the tool config is self.config_file = config_file self.tool_dir = os.path.dirname( config_file ) self.app = app # Parse XML element containing configuration self.parse( root ) @property def sa_session( self ): """ Returns a SQLAlchemy session """ return self.app.model.context def parse( self, root ): """ Read tool configuration from the element `root` and fill in `self`. """ # Get the (user visible) name of the tool self.name = root.get( "name" ) if not self.name: raise Exception, "Missing tool 'name'" # Get the UNIQUE id for the tool # TODO: can this be generated automatically? self.id = root.get( "id" ) if not self.id: raise Exception, "Missing tool 'id'" self.version = root.get( "version" ) if not self.version: # For backward compatibility, some tools may not have versions yet. self.version = "1.0.0" # Support multi-byte tools self.is_multi_byte = util.string_as_bool( root.get( "is_multi_byte", False ) ) # Force history to fully refresh after job execution for this tool. # Useful i.e. when an indeterminate number of outputs are created by # a tool. self.force_history_refresh = util.string_as_bool( root.get( 'force_history_refresh', 'False' ) ) # Load input translator, used by datasource tools to change # names/values of incoming parameters self.input_translator = root.find( "request_param_translation" ) if self.input_translator: self.input_translator = ToolInputTranslator.from_element( self.input_translator ) # Command line (template). Optional for tools that do not invoke a # local program command = root.find("command") if command is not None and command.text is not None: self.command = command.text.lstrip() # get rid of leading whitespace interpreter = command.get("interpreter") if interpreter: # TODO: path munging for cluster/dataset server relocatability executable = self.command.split()[0] abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) self.command = self.command.replace(executable, abs_executable, 1) self.command = interpreter + " " + self.command else: self.command = '' # Parameters used to build URL for redirection to external app redirect_url_params = root.find( "redirect_url_params" ) if redirect_url_params is not None and redirect_url_params.text is not None: # get rid of leading / trailing white space redirect_url_params = redirect_url_params.text.strip() # Replace remaining white space with something we can safely split on later # when we are building the params self.redirect_url_params = redirect_url_params.replace( ' ', '**^**' ) else: self.redirect_url_params = '' # Short description of the tool self.description = util.xml_text(root, "description") # Job runner if self.app.config.start_job_runners is None: # Jobs are always local regardless of tool config if no additional # runners are started self.job_runner = "local:///" else: # Set job runner to the cluster default self.job_runner = self.app.config.default_cluster_job_runner for tup in self.app.config.tool_runners: if tup[0] == self.id.lower(): self.job_runner = tup[1] break # Is this a 'hidden' tool (hidden in tool menu) self.hidden = util.xml_text(root, "hidden") if self.hidden: self.hidden = util.string_as_bool(self.hidden) # Load any tool specific code (optional) Edit: INS 5/29/2007, # allow code files to have access to the individual tool's # "module" if it has one. Allows us to reuse code files, etc. self.code_namespace = dict() self.hook_map = {} for code_elem in root.findall("code"): for hook_elem in code_elem.findall("hook"): for key, value in hook_elem.items(): # map hook to function self.hook_map[key]=value file_name = code_elem.get("file") code_path = os.path.join( self.tool_dir, file_name ) execfile( code_path, self.code_namespace ) # Load any tool specific options (optional) self.options = dict( sanitize=True, refresh=False ) for option_elem in root.findall("options"): for option, value in self.options.copy().items(): if isinstance(value, type(False)): self.options[option] = util.string_as_bool(option_elem.get(option, str(value))) else: self.options[option] = option_elem.get(option, str(value)) self.options = Bunch(** self.options) # Parse tool inputs (if there are any required) self.parse_inputs( root ) # Parse tool help self.parse_help( root ) # Description of outputs produced by an invocation of the tool self.parse_outputs( root ) # Any extra generated config files for the tool self.config_files = [] conf_parent_elem = root.find("configfiles") if conf_parent_elem: for conf_elem in conf_parent_elem.findall( "configfile" ): name = conf_elem.get( "name" ) filename = conf_elem.get( "filename", None ) text = conf_elem.text self.config_files.append( ( name, filename, text ) ) # Action action_elem = root.find( "action" ) if action_elem is None: self.tool_action = DefaultToolAction() else: module = action_elem.get( 'module' ) cls = action_elem.get( 'class' ) mod = __import__( module, globals(), locals(), [cls]) self.tool_action = getattr( mod, cls )() # User interface hints self.uihints = {} uihints_elem = root.find( "uihints" ) if uihints_elem is not None: for key, value in uihints_elem.attrib.iteritems(): self.uihints[ key ] = value # Tests tests_elem = root.find( "tests" ) if tests_elem: try: self.parse_tests( tests_elem ) except: log.exception( "Failed to parse tool tests" ) else: self.tests = None # Requirements (dependencies) self.requirements = [] requirements_elem = root.find( "requirements" ) if requirements_elem: self.parse_requirements( requirements_elem ) # Determine if this tool can be used in workflows self.is_workflow_compatible = self.check_workflow_compatible() def parse_inputs( self, root ): """ Parse the "" element and create appropriate `ToolParameter`s. This implementation supports multiple pages and grouping constructs. """ # Load parameters (optional) input_elem = root.find("inputs") if input_elem: # Handle properties of the input form self.check_values = util.string_as_bool( input_elem.get("check_values", "true") ) self.nginx_upload = util.string_as_bool( input_elem.get( "nginx_upload", "false" ) ) self.action = input_elem.get( 'action', '/tool_runner/index' ) # If we have an nginx upload, save the action as a tuple instead of # a string. The actual action needs to get url_for run to add any # prefixes, and we want to avoid adding the prefix to the # nginx_upload_path. This logic is handled in the tool_form.mako # template. if self.nginx_upload and self.app.config.nginx_upload_path: if '?' in urllib.unquote_plus( self.action ): raise Exception( 'URL parameters in a non-default tool action can not be used ' \ 'in conjunction with nginx upload. Please convert them to ' \ 'hidden POST parameters' ) self.action = (self.app.config.nginx_upload_path + '?nginx_redir=', urllib.unquote_plus(self.action)) self.target = input_elem.get( "target", "galaxy_main" ) self.method = input_elem.get( "method", "post" ) # Parse the actual parameters self.inputs = odict() self.inputs_by_page = list() self.display_by_page = list() enctypes = set() # Handle multiple page case pages = input_elem.findall( "page" ) for page in ( pages or [ input_elem ] ): display, inputs = self.parse_input_page( page, enctypes ) self.inputs_by_page.append( inputs ) self.inputs.update( inputs ) self.display_by_page.append( display ) self.display = self.display_by_page[0] self.npages = len( self.inputs_by_page ) self.last_page = len( self.inputs_by_page ) - 1 self.has_multiple_pages = bool( self.last_page ) # Determine the needed enctype for the form if len( enctypes ) == 0: self.enctype = "application/x-www-form-urlencoded" elif len( enctypes ) == 1: self.enctype = enctypes.pop() else: raise Exception, "Conflicting required enctypes: %s" % str( enctypes ) # Check if the tool either has no parameters or only hidden (and # thus hardcoded) parameters. FIXME: hidden parameters aren't # parameters at all really, and should be passed in a different # way, making this check easier. self.input_required = False for param in self.inputs.values(): if not isinstance( param, ( HiddenToolParameter, BaseURLToolParameter ) ): self.input_required = True break def parse_help( self, root ): """ Parse the help text for the tool. Formatted in reStructuredText. This implementation supports multiple pages. """ # TODO: Allow raw HTML or an external link. self.help = root.find("help") self.help_by_page = list() help_header = "" help_footer = "" if self.help is not None: help_pages = self.help.findall( "page" ) help_header = self.help.text try: self.help = util.rst_to_html(self.help.text) except: log.exception( "error in help for tool %s" % self.name ) # Multiple help page case if help_pages: for help_page in help_pages: self.help_by_page.append( help_page.text ) help_footer = help_footer + help_page.tail # Each page has to rendered all-together because of backreferences allowed by rst try: self.help_by_page = [ util.rst_to_html( help_header + x + help_footer ) for x in self.help_by_page ] except: log.exception( "error in multi-page help for tool %s" % self.name ) # Pad out help pages to match npages ... could this be done better? while len( self.help_by_page ) < self.npages: self.help_by_page.append( self.help ) def parse_outputs( self, root ): """ Parse elements and fill in self.outputs (keyed by name) """ self.outputs = odict() out_elem = root.find("outputs") if not out_elem: return for data_elem in out_elem.findall("data"): output = ToolOutput( data_elem.get("name") ) output.format = data_elem.get("format", "data") output.change_format = data_elem.findall("change_format") output.metadata_source = data_elem.get("metadata_source", "") output.parent = data_elem.get("parent", None) output.label = util.xml_text( data_elem, "label" ) output.count = int( data_elem.get("count", 1) ) output.filters = data_elem.findall( 'filter' ) output.tool = self output.actions = ToolOutputActionGroup( output, data_elem.find( 'actions' ) ) self.outputs[ output.name ] = output def parse_tests( self, tests_elem ): """ Parse any "" elements, create a `ToolTestBuilder` for each and store in `self.tests`. """ self.tests = [] # Composite datasets need a unique name: each test occurs in a fresh # history, but we'll keep it unique per set of tests composite_data_names_counter = 0 for i, test_elem in enumerate( tests_elem.findall( 'test' ) ): name = test_elem.get( 'name', 'Test-%d' % (i+1) ) maxseconds = int( test_elem.get( 'maxseconds', '120' ) ) test = ToolTestBuilder( self, name, maxseconds ) try: for param_elem in test_elem.findall( "param" ): attrib = dict( param_elem.attrib ) if 'values' in attrib: value = attrib[ 'values' ].split( ',' ) elif 'value' in attrib: value = attrib['value'] else: value = None attrib['children'] = list( param_elem.getchildren() ) if attrib['children']: # At this time, we can assume having children only # occurs on DataToolParameter test items but this could # change and would cause the below parsing to change # based upon differences in children items attrib['metadata'] = [] attrib['composite_data'] = [] attrib['edit_attributes'] = [] # Composite datasets need to be renamed uniquely composite_data_name = None for child in attrib['children']: if child.tag == 'composite_data': attrib['composite_data'].append( child ) if composite_data_name is None: # Generate a unique name; each test uses a # fresh history composite_data_name = '_COMPOSITE_RENAMED_%i_' \ % ( composite_data_names_counter ) composite_data_names_counter += 1 elif child.tag == 'metadata': attrib['metadata'].append( child ) elif child.tag == 'metadata': attrib['metadata'].append( child ) elif child.tag == 'edit_attributes': attrib['edit_attributes'].append( child ) if composite_data_name: # Composite datasets need implicit renaming; # inserted at front of list so explicit declarations # take precedence attrib['edit_attributes'].insert( 0, { 'type': 'name', 'value': composite_data_name } ) test.add_param( attrib.pop( 'name' ), value, attrib ) for output_elem in test_elem.findall( "output" ): attrib = dict( output_elem.attrib ) name = attrib.pop( 'name', None ) if name is None: raise Exception( "Test output does not have a 'name'" ) file = attrib.pop( 'file', None ) if file is None: raise Exception( "Test output does not have a 'file'") attributes = {} # Method of comparison attributes['compare'] = attrib.pop( 'compare', 'diff' ).lower() # Number of lines to allow to vary in logs (for dates, etc) attributes['lines_diff'] = int( attrib.pop( 'lines_diff', '0' ) ) # Allow a file size to vary if sim_size compare attributes['delta'] = int( attrib.pop( 'delta', '10000' ) ) attributes['sort'] = util.string_as_bool( attrib.pop( 'sort', False ) ) attributes['extra_files'] = [] for extra in output_elem.findall( 'extra_files' ): # File or directory, when directory, compare basename # by basename extra_type = extra.get( 'type', 'file' ) extra_name = extra.get( 'name', None ) assert extra_type == 'directory' or extra_name is not None, \ 'extra_files type (%s) requires a name attribute' % extra_type extra_value = extra.get( 'value', None ) assert extra_value is not None, 'extra_files requires a value attribute' extra_attributes = {} extra_attributes['compare'] = extra.get( 'compare', 'diff' ).lower() extra_attributes['delta'] = extra.get( 'delta', '0' ) extra_attributes['lines_diff'] = int( extra.get( 'lines_diff', '0' ) ) extra_attributes['sort'] = util.string_as_bool( extra.get( 'sort', False ) ) attributes['extra_files'].append( ( extra_type, extra_value, extra_name, extra_attributes ) ) test.add_output( name, file, attributes ) except Exception, e: test.error = True test.exception = e self.tests.append( test ) def parse_input_page( self, input_elem, enctypes ): """ Parse a page of inputs. This basically just calls 'parse_input_elem', but it also deals with possible 'display' elements which are supported only at the top/page level (not in groups). """ inputs = self.parse_input_elem( input_elem, enctypes ) # Display display_elem = input_elem.find("display") if display_elem is not None: display = util.xml_to_string(display_elem) else: display = None return display, inputs def parse_input_elem( self, parent_elem, enctypes, context=None ): """ Parse a parent element whose children are inputs -- these could be groups (repeat, conditional) or param elements. Groups will be parsed recursively. """ rval = odict() context = ExpressionContext( rval, context ) for elem in parent_elem: # Repeat group if elem.tag == "repeat": group = Repeat() group.name = elem.get( "name" ) group.title = elem.get( "title" ) group.inputs = self.parse_input_elem( elem, enctypes, context ) group.default = int( elem.get( "default", 0 ) ) group.min = int( elem.get( "min", 0 ) ) # Use float instead of int so that 'inf' can be used for no max group.max = float( elem.get( "max", "inf" ) ) assert group.min <= group.max, \ ValueError( "Min repeat count must be less-than-or-equal to the max." ) # Force default to be within min-max range group.default = min( max( group.default, group.min ), group.max ) rval[group.name] = group elif elem.tag == "conditional": group = Conditional() group.name = elem.get( "name" ) group.value_ref = elem.get( 'value_ref', None ) group.value_ref_in_group = util.string_as_bool( elem.get( 'value_ref_in_group', 'True' ) ) value_from = elem.get( "value_from" ) if value_from: value_from = value_from.split( ':' ) group.value_from = locals().get( value_from[0] ) group.test_param = rval[ group.value_ref ] group.test_param.refresh_on_change = True for attr in value_from[1].split( '.' ): group.value_from = getattr( group.value_from, attr ) for case_value, case_inputs in group.value_from( context, group, self ).iteritems(): case = ConditionalWhen() case.value = case_value if case_inputs: case.inputs = self.parse_input_elem( ElementTree.XML( "%s" % case_inputs ), enctypes, context ) else: case.inputs = {} group.cases.append( case ) else: # Should have one child "input" which determines the case input_elem = elem.find( "param" ) assert input_elem is not None, " must have a child " group.test_param = self.parse_param_elem( input_elem, enctypes, context ) # Must refresh when test_param changes group.test_param.refresh_on_change = True # And a set of possible cases for case_elem in elem.findall( "when" ): case = ConditionalWhen() case.value = case_elem.get( "value" ) case.inputs = self.parse_input_elem( case_elem, enctypes, context ) group.cases.append( case ) rval[group.name] = group elif elem.tag == "upload_dataset": group = UploadDataset() group.name = elem.get( "name" ) group.title = elem.get( "title" ) group.file_type_name = elem.get( 'file_type_name', group.file_type_name ) group.default_file_type = elem.get( 'default_file_type', group.default_file_type ) group.metadata_ref = elem.get( 'metadata_ref', group.metadata_ref ) rval[ group.file_type_name ].refresh_on_change = True rval[ group.file_type_name ].refresh_on_change_values = \ self.app.datatypes_registry.get_composite_extensions() group.inputs = self.parse_input_elem( elem, enctypes, context ) rval[ group.name ] = group elif elem.tag == "param": param = self.parse_param_elem( elem, enctypes, context ) rval[param.name] = param return rval def parse_param_elem( self, input_elem, enctypes, context ): """ Parse a single "" element and return a ToolParameter instance. Also, if the parameter has a 'required_enctype' add it to the set enctypes. """ param = ToolParameter.build( self, input_elem ) param_enctype = param.get_required_enctype() if param_enctype: enctypes.add( param_enctype ) # If parameter depends on any other paramters, we must refresh the # form when it changes for name in param.get_dependencies(): context[ name ].refresh_on_change = True return param def parse_requirements( self, requirements_elem ): """ Parse each requirement from the element and add to self.requirements """ for requirement_elem in requirements_elem.findall( 'requirement' ): requirement = ToolRequirement() requirement.name = util.xml_text( requirement_elem ) requirement.type = requirement_elem.get( "type", "package" ) requirement.version = requirement_elem.get( "version" ) self.requirements.append( requirement ) def check_workflow_compatible( self ): """ Determine if a tool can be used in workflows. External tools and the upload tool are currently not supported by workflows. """ # Multiple page tools are not supported -- we're eliminating most # of these anyway if self.has_multiple_pages: return False # This is probably the best bet for detecting external web tools # right now if self.action != "/tool_runner/index": return False # HACK: upload is (as always) a special case becuase file parameters # can't be persisted. if self.id == "upload1": return False # TODO: Anyway to capture tools that dynamically change their own # outputs? return True def new_state( self, trans, all_pages=False ): """ Create a new `DefaultToolState` for this tool. It will be initialized with default values for inputs. Only inputs on the first page will be initialized unless `all_pages` is True, in which case all inputs regardless of page are initialized. """ state = DefaultToolState() state.inputs = {} if all_pages: inputs = self.inputs else: inputs = self.inputs_by_page[ 0 ] self.fill_in_new_state( trans, inputs, state.inputs ) return state def fill_in_new_state( self, trans, inputs, state, context=None ): """ Fill in a tool state dictionary with default values for all parameters in the dictionary `inputs`. Grouping elements are filled in recursively. """ context = ExpressionContext( state, context ) for input in inputs.itervalues(): state[ input.name ] = input.get_initial_value( trans, context ) def get_param_html_map( self, trans, page=0, other_values={} ): """ Return a dictionary containing the HTML representation of each parameter. This is used for rendering display elements. It is currently not compatible with grouping constructs. NOTE: This should be considered deprecated, it is only used for tools with `display` elements. These should be eliminated. """ rval = dict() for key, param in self.inputs_by_page[page].iteritems(): if not isinstance( param, ToolParameter ): raise Exception( "'get_param_html_map' only supported for simple paramters" ) rval[key] = param.get_html( trans, other_values=other_values ) return rval def get_param( self, key ): """ Returns the parameter named `key` or None if there is no such parameter. """ return self.inputs.get( key, None ) def get_hook(self, name): """ Returns an object from the code file referenced by `code_namespace` (this will normally be a callable object) """ if self.code_namespace: # Try to look up hook in self.hook_map, otherwise resort to default if name in self.hook_map and self.hook_map[name] in self.code_namespace: return self.code_namespace[self.hook_map[name]] elif name in self.code_namespace: return self.code_namespace[name] return None def visit_inputs( self, value, callback ): """ Call the function `callback` on each parameter of this tool. Visits grouping parameters recursively and constructs unique prefixes for each nested set of parameters. The callback method is then called as: `callback( level_prefix, parameter, parameter_value )` """ # HACK: Yet another hack around check_values -- WHY HERE? if not self.check_values: return for input in self.inputs.itervalues(): if isinstance( input, ToolParameter ): callback( "", input, value[input.name] ) else: input.visit_inputs( "", value[input.name], callback ) def handle_input( self, trans, incoming ): """ Process incoming parameters for this tool from the dict `incoming`, update the tool state (or create if none existed), and either return to the form or execute the tool (only if 'execute' was clicked and there were no errors). """ # Get the state or create if not found if "tool_state" in incoming: encoded_state = util.string_to_object( incoming["tool_state"] ) state = DefaultToolState() state.decode( encoded_state, self, trans.app ) else: state = self.new_state( trans ) # This feels a bit like a hack. It allows forcing full processing # of inputs even when there is no state in the incoming dictionary # by providing either 'runtool_btn' (the name of the submit button # on the standard run form) or "URL" (a parameter provided by # external data source tools). if "runtool_btn" not in incoming and "URL" not in incoming: return "tool_form.mako", dict( errors={}, tool_state=state, param_values={}, incoming={} ) # Process incoming data if not( self.check_values ): # If `self.check_values` is false we don't do any checking or # processing on input parameters. This is used to pass raw values # through to/from external sites. FIXME: This should be handled # more cleanly, there is no reason why external sites need to # post back to the same URL that the tool interface uses. errors = {} params = incoming else: # Update state for all inputs on the current page taking new # values from `incoming`. errors = self.update_state( trans, self.inputs_by_page[state.page], state.inputs, incoming ) # If the tool provides a `validate_input` hook, call it. validate_input = self.get_hook( 'validate_input' ) if validate_input: validate_input( trans, errors, state.inputs, self.inputs_by_page[state.page] ) params = state.inputs # Did the user actually click next / execute or is this just # a refresh? if 'runtool_btn' in incoming or 'URL' in incoming or 'ajax_upload' in incoming: # If there were errors, we stay on the same page and display # error messages if errors: error_message = "One or more errors were found in the input you provided. The specific errors are marked below." return "tool_form.mako", dict( errors=errors, tool_state=state, incoming=incoming, error_message=error_message ) # If we've completed the last page we can execute the tool elif state.page == self.last_page: _, out_data = self.execute( trans, incoming=params ) try: assert isinstance( out_data, odict ) return 'tool_executed.mako', dict( out_data=out_data ) except: return 'message.mako', dict( status='error', message='odict not returned from tool execution', refresh_frames=[] ) # Otherwise move on to the next page else: state.page += 1 # Fill in the default values for the next page self.fill_in_new_state( trans, self.inputs_by_page[ state.page ], state.inputs ) return 'tool_form.mako', dict( errors=errors, tool_state=state ) else: try: self.find_fieldstorage( state.inputs ) except InterruptedUpload: # If inputs contain a file it won't persist. Most likely this # is an interrupted upload. We should probably find a more # standard method of determining an incomplete POST. return self.handle_interrupted( trans, state.inputs ) except: pass # Just a refresh, render the form with updated state and errors. return 'tool_form.mako', dict( errors=errors, tool_state=state ) def find_fieldstorage( self, x ): if isinstance( x, FieldStorage ): raise InterruptedUpload( None ) elif type( x ) is types.DictType: [ self.find_fieldstorage( y ) for y in x.values() ] elif type( x ) is types.ListType: [ self.find_fieldstorage( y ) for y in x ] def handle_interrupted( self, trans, inputs ): """ Upon handling inputs, if it appears that we have received an incomplete form, do some cleanup or anything else deemed necessary. Currently this is only likely during file uploads, but this method could be generalized and a method standardized for handling other tools. """ # If the async upload tool has uploading datasets, we need to error them. if 'async_datasets' in inputs and inputs['async_datasets'] not in [ 'None', '', None ]: for id in inputs['async_datasets'].split(','): try: data = self.sa_session.query( trans.model.HistoryDatasetAssociation ).get( int( id ) ) except: log.exception( 'Unable to load precreated dataset (%s) sent in upload form' % id ) continue if trans.user is None and trans.galaxy_session.current_history != data.history: log.error( 'Got a precreated dataset (%s) but it does not belong to anonymous user\'s current session (%s)' % ( data.id, trans.galaxy_session.id ) ) elif data.history.user != trans.user: log.error( 'Got a precreated dataset (%s) but it does not belong to current user (%s)' % ( data.id, trans.user.id ) ) else: data.state = data.states.ERROR data.info = 'Upload of this dataset was interrupted. Please try uploading again or' self.sa_session.add( data ) self.sa_session.flush() # It's unlikely the user will ever see this. return 'message.mako', dict( status='error', message='Your upload was interrupted. If this was uninentional, please retry it.', refresh_frames=[], cont=None ) def update_state( self, trans, inputs, state, incoming, prefix="", context=None, update_only=False, old_errors={}, item_callback=None ): """ Update the tool state in `state` using the user input in `incoming`. This is designed to be called recursively: `inputs` contains the set of inputs being processed, and `prefix` specifies a prefix to add to the name of each input to extract it's value from `incoming`. If `update_only` is True, values that are not in `incoming` will not be modified. In this case `old_errors` can be provided, and any errors for parameters which were *not* updated will be preserved. """ errors = dict() # Push this level onto the context stack context = ExpressionContext( state, context ) # Iterate inputs and update (recursively) for input in inputs.itervalues(): key = prefix + input.name if isinstance( input, Repeat ): group_state = state[input.name] # Create list of empty errors for each previously existing state group_errors = [ {} for i in range( len( group_state ) ) ] group_old_errors = old_errors.get( input.name, None ) any_group_errors = False # Check any removals before updating state -- only one # removal can be performed, others will be ignored for i, rep_state in enumerate( group_state ): rep_index = rep_state['__index__'] if key + "_" + str(rep_index) + "_remove" in incoming: if len( group_state ) > input.min: del group_state[i] del group_errors[i] if group_old_errors: del group_old_errors[i] break else: group_errors[i] = { '__index__': 'Cannot remove repeat (min size=%i).' % input.min } any_group_errors = True # Only need to find one that can't be removed due to size, since only # one removal is processed at # a time anyway break # Update state max_index = -1 for i, rep_state in enumerate( group_state ): rep_index = rep_state['__index__'] max_index = max( max_index, rep_index ) rep_prefix = "%s_%d|" % ( key, rep_index ) if group_old_errors: rep_old_errors = group_old_errors[i] else: rep_old_errors = {} rep_errors = self.update_state( trans, input.inputs, rep_state, incoming, prefix=rep_prefix, context=context, update_only=update_only, old_errors=rep_old_errors, item_callback=item_callback ) if rep_errors: any_group_errors = True group_errors[i].update( rep_errors ) # Check for addition if key + "_add" in incoming: if len( group_state ) < input.max: new_state = {} new_state['__index__'] = max_index + 1 self.fill_in_new_state( trans, input.inputs, new_state, context ) group_state.append( new_state ) group_errors.append( {} ) else: group_errors[-1] = { '__index__': 'Cannot add repeat (max size=%i).' % input.max } any_group_errors = True # Were there *any* errors for any repetition? if any_group_errors: errors[input.name] = group_errors elif isinstance( input, Conditional ): group_state = state[input.name] group_old_errors = old_errors.get( input.name, {} ) old_current_case = group_state['__current_case__'] group_prefix = "%s|" % ( key ) # Deal with the 'test' element and see if it's value changed if input.value_ref and not input.value_ref_in_group: # We are referencing an existent parameter, which is not # part of this group test_param_key = prefix + input.test_param.name else: test_param_key = group_prefix + input.test_param.name test_param_error = None test_incoming = get_incoming_value( incoming, test_param_key, None ) if test_param_key not in incoming \ and "__force_update__" + test_param_key not in incoming \ and update_only: # Update only, keep previous value and state, but still # recurse in case there are nested changes value = group_state[ input.test_param.name ] current_case = old_current_case if input.test_param.name in old_errors: errors[ input.test_param.name ] = old_errors[ input.test_param.name ] else: # Get value of test param and determine current case value, test_param_error = \ check_param( trans, input.test_param, test_incoming, context ) current_case = input.get_current_case( value, trans ) if current_case != old_current_case: # Current case has changed, throw away old state group_state = state[input.name] = {} # TODO: we should try to preserve values if we can self.fill_in_new_state( trans, input.cases[current_case].inputs, group_state, context ) group_errors = dict() group_old_errors = dict() else: # Current case has not changed, update children group_errors = self.update_state( trans, input.cases[current_case].inputs, group_state, incoming, prefix=group_prefix, context=context, update_only=update_only, old_errors=group_old_errors, item_callback=item_callback ) if test_param_error: group_errors[ input.test_param.name ] = test_param_error if group_errors: errors[ input.name ] = group_errors # Store the current case in a special value group_state['__current_case__'] = current_case # Store the value of the test element group_state[ input.test_param.name ] = value elif isinstance( input, UploadDataset ): group_state = state[input.name] group_errors = [] group_old_errors = old_errors.get( input.name, None ) any_group_errors = False d_type = input.get_datatype( trans, context ) writable_files = d_type.writable_files #remove extra files while len( group_state ) > len( writable_files ): del group_state[-1] if group_old_errors: del group_old_errors[-1] # Update state max_index = -1 for i, rep_state in enumerate( group_state ): rep_index = rep_state['__index__'] max_index = max( max_index, rep_index ) rep_prefix = "%s_%d|" % ( key, rep_index ) if group_old_errors: rep_old_errors = group_old_errors[i] else: rep_old_errors = {} rep_errors = self.update_state( trans, input.inputs, rep_state, incoming, prefix=rep_prefix, context=context, update_only=update_only, old_errors=rep_old_errors, item_callback=item_callback ) if rep_errors: any_group_errors = True group_errors.append( rep_errors ) else: group_errors.append( {} ) # Add new fileupload as needed offset = 1 while len( writable_files ) > len( group_state ): new_state = {} new_state['__index__'] = max_index + offset offset += 1 self.fill_in_new_state( trans, input.inputs, new_state, context ) group_state.append( new_state ) if any_group_errors: group_errors.append( {} ) # Were there *any* errors for any repetition? if any_group_errors: errors[input.name] = group_errors else: if key not in incoming \ and "__force_update__" + key not in incoming \ and update_only: # No new value provided, and we are only updating, so keep # the old value (which should already be in the state) and # preserve the old error message. if input.name in old_errors: errors[ input.name ] = old_errors[ input.name ] else: incoming_value = get_incoming_value( incoming, key, None ) value, error = check_param( trans, input, incoming_value, context ) # If a callback was provided, allow it to process the value if item_callback: old_value = state.get( input.name, None ) value, error = item_callback( trans, key, input, value, error, old_value, context ) if error: errors[ input.name ] = error state[ input.name ] = value return errors def get_static_param_values( self, trans ): """ Returns a map of parameter names and values if the tool does not require any user input. Will raise an exception if any parameter does require input. """ args = dict() for key, param in self.inputs.iteritems(): if isinstance( param, HiddenToolParameter ): args[key] = param.value elif isinstance( param, BaseURLToolParameter ): args[key] = param.get_value( trans ) else: raise Exception( "Unexpected parameter type" ) return args def execute( self, trans, incoming={}, set_output_hid=True ): """ Execute the tool using parameter values in `incoming`. This just dispatches to the `ToolAction` instance specified by `self.tool_action`. In general this will create a `Job` that when run will build the tool's outputs, e.g. `DefaultToolAction`. """ return self.tool_action.execute( self, trans, incoming=incoming, set_output_hid=set_output_hid ) def params_to_strings( self, params, app ): return params_to_strings( self.inputs, params, app ) def params_from_strings( self, params, app, ignore_errors=False ): return params_from_strings( self.inputs, params, app, ignore_errors ) def check_and_update_param_values( self, values, trans ): """ Check that all parameters have values, and fill in with default values where neccesary. This could be called after loading values from a database in case new parameters have been added. """ messages = {} self.check_and_update_param_values_helper( self.inputs, values, trans, messages ) return messages def check_and_update_param_values_helper( self, inputs, values, trans, messages, context=None, prefix="" ): """ Recursive helper for `check_and_update_param_values_helper` """ context = ExpressionContext( values, context ) for input in inputs.itervalues(): # No value, insert the default if input.name not in values: messages[ input.name ] = "No value found for '%s%s', used default" % ( prefix, input.label ) values[ input.name ] = input.get_initial_value( trans, context ) # Value, visit recursively as usual else: if isinstance( input, Repeat ): for i, d in enumerate( values[ input.name ] ): rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 ) self.check_and_update_param_values_helper( input.inputs, d, trans, messages, context, rep_prefix ) elif isinstance( input, Conditional ): group_values = values[ input.name ] if input.test_param.name not in group_values: # No test param invalidates the whole conditional values[ input.name ] = group_values = input.get_initial_value( trans, context ) messages[ input.test_param.name ] = "No value found for '%s%s', used default" % ( prefix, input.test_param.label ) current_case = group_values['__current_case__'] for child_input in input.cases[current_case].inputs.itervalues(): messages[ child_input.name ] = "Value no longer valid for '%s%s', replaced with default" % ( prefix, child_input.label ) else: current = group_values["__current_case__"] self.check_and_update_param_values_helper( input.cases[current].inputs, group_values, trans, messages, context, prefix ) else: # Regular tool parameter, no recursion needed pass def handle_unvalidated_param_values( self, input_values, app ): """ Find any instances of `UnvalidatedValue` within input_values and validate them (by calling `ToolParameter.from_html` and `ToolParameter.validate`). """ # No validation is done when check_values is False if not self.check_values: return self.handle_unvalidated_param_values_helper( self.inputs, input_values, app ) def handle_unvalidated_param_values_helper( self, inputs, input_values, app, context=None, prefix="" ): """ Recursive helper for `handle_unvalidated_param_values` """ context = ExpressionContext( input_values, context ) for input in inputs.itervalues(): if isinstance( input, Repeat ): for i, d in enumerate( input_values[ input.name ] ): rep_prefix = prefix + "%s %d > " % ( input.title, i + 1 ) self.handle_unvalidated_param_values_helper( input.inputs, d, app, context, rep_prefix ) elif isinstance( input, Conditional ): values = input_values[ input.name ] current = values["__current_case__"] # NOTE: The test param doesn't need to be checked since # there would be no way to tell what case to use at # workflow build time. However I'm not sure if we are # actually preventing such a case explicately. self.handle_unvalidated_param_values_helper( input.cases[current].inputs, values, app, context, prefix ) else: # Regular tool parameter value = input_values[ input.name ] if isinstance( value, UnvalidatedValue ): try: # Convert from html representation if value.value is None: # If value.value is None, it could not have been # submited via html form and therefore .from_html # can't be guaranteed to work value = None else: value = input.from_html( value.value, None, context ) # Do any further validation on the value input.validate( value, None ) except Exception, e: # Wrap an re-raise any generated error so we can # generate a more informative message v = input.value_to_display_text( value, self.app ) message = "Failed runtime validation of %s%s (%s)" \ % ( prefix, input.label, e ) raise LateValidationError( message ) input_values[ input.name ] = value def handle_job_failure_exception( self, e ): """ Called by job.fail when an exception is generated to allow generation of a better error message (returning None yields the default behavior) """ message = None # If the exception was generated by late validation, use its error # message (contains the parameter name and value) if isinstance( e, LateValidationError ): message = e.message return message def build_param_dict( self, incoming, input_datasets, output_datasets, output_paths, job_working_directory ): """ Build the dictionary of parameters for substituting into the command line. Each value is wrapped in a `InputValueWrapper`, which allows all the attributes of the value to be used in the template, *but* when the __str__ method is called it actually calls the `to_param_dict_value` method of the associated input. """ param_dict = dict() # All parameters go into the param_dict param_dict.update( incoming ) # Wrap parameters as neccesary def wrap_values( inputs, input_values ): for input in inputs.itervalues(): if isinstance( input, Repeat ): for d in input_values[ input.name ]: wrap_values( input.inputs, d ) elif isinstance( input, Conditional ): values = input_values[ input.name ] current = values["__current_case__"] wrap_values( input.cases[current].inputs, values ) elif isinstance( input, DataToolParameter ): ## FIXME: We're populating param_dict with conversions when ## wrapping values, this should happen as a separate ## step before wrapping (or call this wrapping step ## something more generic) (but iterating this same ## list twice would be wasteful) # Add explicit conversions by name to current parent for conversion_name, conversion_extensions, conversion_datatypes in input.conversions: # If we are at building cmdline step, then converters # have already executed conv_ext, converted_dataset = input_values[ input.name ].find_conversion_destination( conversion_datatypes ) # When dealing with optional inputs, we'll provide a # valid extension to be used for None converted dataset if not conv_ext: conv_ext = conversion_extensions[0] # input_values[ input.name ] is None when optional # dataset, 'conversion' of optional dataset should # create wrapper around NoneDataset for converter output if input_values[ input.name ] and not converted_dataset: # Input that converter is based from has a value, # but converted dataset does not exist raise Exception( 'A path for explicit datatype conversion has not been found: %s --/--> %s' % ( input_values[ input.name ].extension, conversion_extensions ) ) else: # Trick wrapper into using target conv ext (when # None) without actually being a tool parameter input_values[ conversion_name ] = \ DatasetFilenameWrapper( converted_dataset, datatypes_registry = self.app.datatypes_registry, tool = Bunch( conversion_name = Bunch( extensions = conv_ext ) ), name = conversion_name ) # Wrap actual input dataset input_values[ input.name ] = \ DatasetFilenameWrapper( input_values[ input.name ], datatypes_registry = self.app.datatypes_registry, tool = self, name = input.name ) elif isinstance( input, SelectToolParameter ): input_values[ input.name ] = SelectToolParameterWrapper( input, input_values[ input.name ], self.app, other_values = param_dict ) else: input_values[ input.name ] = InputValueWrapper( input, input_values[ input.name ], param_dict ) # HACK: only wrap if check_values is not false, this deals with external # tools where the inputs don't even get passed through. These # tools (e.g. UCSC) should really be handled in a special way. if self.check_values: wrap_values( self.inputs, param_dict ) ## FIXME: when self.check_values==True, input datasets are being wrapped ## twice (above and below, creating 2 separate ## DatasetFilenameWrapper objects - first is overwritten by ## second), is this necessary? - if we get rid of this way to ## access children, can we stop this redundancy, or is there ## another reason for this? ## - Only necessary when self.check_values is False (==external dataset ## tool?: can this be abstracted out as part of being a datasouce tool?) ## - But we still want (ALWAYS) to wrap input datasets (this should be ## checked to prevent overhead of creating a new object?) # Additionally, datasets go in the param dict. We wrap them such that # if the bare variable name is used it returns the filename (for # backwards compatibility). We also add any child datasets to the # the param dict encoded as: # "_CHILD___{dataset_name}___{child_designation}", # but this should be considered DEPRECATED, instead use: # $dataset.get_child( 'name' ).filename for name, data in input_datasets.items(): param_dict[name] = DatasetFilenameWrapper( data, datatypes_registry = self.app.datatypes_registry, tool = self, name = name ) if data: for child in data.children: param_dict[ "_CHILD___%s___%s" % ( name, child.designation ) ] = DatasetFilenameWrapper( child ) for name, hda in output_datasets.items(): # Write outputs to the working directory (for security purposes) # if desired. if self.app.config.outputs_to_working_directory: try: false_path = [ dp.false_path for dp in output_paths if dp.real_path == hda.file_name ][0] param_dict[name] = DatasetFilenameWrapper( hda, false_path = false_path ) open( false_path, 'w' ).close() except IndexError: log.warning( "Unable to determine alternate path for writing job outputs, outputs will be written to their real paths" ) param_dict[name] = DatasetFilenameWrapper( hda ) else: param_dict[name] = DatasetFilenameWrapper( hda ) # Provide access to a path to store additional files # TODO: path munging for cluster/dataset server relocatability param_dict[name].files_path = os.path.abspath(os.path.join( job_working_directory, "dataset_%s_files" % (hda.dataset.id) )) for child in hda.children: param_dict[ "_CHILD___%s___%s" % ( name, child.designation ) ] = DatasetFilenameWrapper( child ) for out_name, output in self.outputs.iteritems(): if out_name not in param_dict and output.filters: # Assume the reason we lack this output is because a filter # failed to pass; for tool writing convienence, provide a # NoneDataset param_dict[ out_name ] = NoneDataset( datatypes_registry = self.app.datatypes_registry, ext = output.format ) # We add access to app here, this allows access to app.config, etc param_dict['__app__'] = RawObjectWrapper( self.app ) # More convienent access to app.config.new_file_path; we don't need to # wrap a string, but this method of generating additional datasets # should be considered DEPRECATED # TODO: path munging for cluster/dataset server relocatability param_dict['__new_file_path__'] = os.path.abspath(self.app.config.new_file_path) # The following points to location (xxx.loc) files which are pointers # to locally cached data param_dict['GALAXY_DATA_INDEX_DIR'] = self.app.config.tool_data_path # For the upload tool, we need to know the root directory and the # datatypes conf path, so we can load the datatypes registry param_dict['GALAXY_ROOT_DIR'] = os.path.abspath( self.app.config.root ) param_dict['GALAXY_DATATYPES_CONF_FILE'] = os.path.abspath( self.app.config.datatypes_config ) # Return the dictionary of parameters return param_dict def build_param_file( self, param_dict, directory=None ): """ Build temporary file for file based parameter transfer if needed """ if self.command and "$param_file" in self.command: fd, param_filename = tempfile.mkstemp( dir=directory ) os.close( fd ) f = open( param_filename, "wt" ) for key, value in param_dict.items(): # parameters can be strings or lists of strings, coerce to list if type(value) != type([]): value = [ value ] for elem in value: f.write( '%s=%s\n' % (key, elem) ) f.close() param_dict['param_file'] = param_filename return param_filename else: return None def build_config_files( self, param_dict, directory=None ): """ Build temporary file for file based parameter transfer if needed """ config_filenames = [] for name, filename, template_text in self.config_files: # If a particular filename was forced by the config use it if filename is not None: if directory is None: raise Exception( "Config files with fixed filenames require a working directory" ) config_filename = os.path.join( directory, filename ) else: fd, config_filename = tempfile.mkstemp( dir=directory ) os.close( fd ) f = open( config_filename, "wt" ) f.write( fill_template( template_text, context=param_dict ) ) f.close() param_dict[name] = config_filename config_filenames.append( config_filename ) return config_filenames def build_command_line( self, param_dict ): """ Build command line to invoke this tool given a populated param_dict """ command_line = None if not self.command: return try: # Substituting parameters into the command command_line = fill_template( self.command, context=param_dict ) # Remove newlines from command line command_line = command_line.replace( "\n", " " ).replace( "\r", " " ) except Exception, e: # Modify exception message to be more clear #e.args = ( 'Error substituting into command line. Params: %r, Command: %s' % ( param_dict, self.command ) ) raise return command_line def build_dependency_shell_commands( self ): """ Return a list of commands to be run to populate the current environment to include this tools requirements. """ commands = [] for requirement in self.requirements: # TODO: currently only supporting requirements of type package, # need to implement some mechanism for mapping other types # back to packages log.debug( "Dependency %s", requirement.name ) if requirement.type == 'package': script_file, base_path, version = self.app.toolbox.dependency_manager.find_dep( requirement.name, requirement.version ) if script_file is None: log.warn( "Failed to resolve dependency on '%s', ignoring", requirement.name ) else: commands.append( 'PACKAGE_BASE=%s source %s' % ( base_path, script_file ) ) return commands def build_redirect_url_params( self, param_dict ): """ Substitute parameter values into self.redirect_url_params """ if not self.redirect_url_params: return redirect_url_params = None # Substituting parameter values into the url params redirect_url_params = fill_template( self.redirect_url_params, context=param_dict ) # Remove newlines redirect_url_params = redirect_url_params.replace( "\n", " " ).replace( "\r", " " ) return redirect_url_params def parse_redirect_url( self, data, param_dict ): """ Parse the REDIRECT_URL tool param. Tools that send data to an external application via a redirect must include the following 3 tool params: 1) REDIRECT_URL - the url to which the data is being sent 2) DATA_URL - the url to which the receiving application will send an http post to retrieve the Galaxy data 3) GALAXY_URL - the url to which the external application may post data as a response """ redirect_url = param_dict.get( 'REDIRECT_URL' ) redirect_url_params = self.build_redirect_url_params( param_dict ) # Add the parameters to the redirect url. We're splitting the param # string on '**^**' because the self.parse() method replaced white # space with that separator. params = redirect_url_params.split( '**^**' ) rup_dict = {} for param in params: p_list = param.split( '=' ) p_name = p_list[0] p_val = p_list[1] rup_dict[ p_name ] = p_val DATA_URL = param_dict.get( 'DATA_URL', None ) assert DATA_URL is not None, "DATA_URL parameter missing in tool config." DATA_URL += "/%s/display" % str( data.id ) redirect_url += "?DATA_URL=%s" % DATA_URL # Add the redirect_url_params to redirect_url for p_name in rup_dict: redirect_url += "&%s=%s" % ( p_name, rup_dict[ p_name ] ) # Add the current user email to redirect_url if data.history.user: USERNAME = str( data.history.user.email ) else: USERNAME = 'Anonymous' redirect_url += "&USERNAME=%s" % USERNAME return redirect_url def call_hook( self, hook_name, *args, **kwargs ): """ Call the custom code hook function identified by 'hook_name' if any, and return the results """ try: code = self.get_hook( hook_name ) if code: return code( *args, **kwargs ) except Exception, e: e.args = ( "Error in '%s' hook '%s', original message: %s" % ( self.name, hook_name, e.args[0] ) ) raise def exec_before_job( self, app, inp_data, out_data, param_dict={} ): pass def exec_after_process( self, app, inp_data, out_data, param_dict, job = None ): pass def job_failed( self, job_wrapper, message, exception = False ): """ Called when a job has failed """ pass def collect_associated_files( self, output, job_working_directory ): """ Find extra files in the job working directory and move them into the appropriate dataset's files directory """ for name, hda in output.items(): temp_file_path = os.path.join( job_working_directory, "dataset_%s_files" % ( hda.dataset.id ) ) try: if len( os.listdir( temp_file_path ) ) > 0: store_file_path = os.path.join( os.path.join( self.app.config.file_path, *directory_hash_id( hda.dataset.id ) ), "dataset_%d_files" % hda.dataset.id ) shutil.move( temp_file_path, store_file_path ) # Fix permissions for basedir, dirs, files in os.walk( store_file_path ): util.umask_fix_perms( basedir, self.app.config.umask, 0777, self.app.config.gid ) for file in files: path = os.path.join( basedir, file ) # Ignore symlinks if os.path.islink( path ): continue util.umask_fix_perms( path, self.app.config.umask, 0666, self.app.config.gid ) except: continue def collect_child_datasets( self, output): """ Look for child dataset files, create HDA and attach to parent. """ children = {} # Loop through output file names, looking for generated children in # form of 'child_parentId_designation_visibility_extension' for name, outdata in output.items(): for filename in glob.glob(os.path.join(self.app.config.new_file_path,"child_%i_*" % outdata.id) ): if not name in children: children[name] = {} fields = os.path.basename(filename).split("_") fields.pop(0) parent_id = int(fields.pop(0)) designation = fields.pop(0) visible = fields.pop(0).lower() if visible == "visible": visible = True else: visible = False ext = fields.pop(0).lower() child_dataset = self.app.model.HistoryDatasetAssociation( extension=ext, parent_id=outdata.id, designation=designation, visible=visible, dbkey=outdata.dbkey, create_dataset=True, sa_session=self.sa_session ) self.app.security_agent.copy_dataset_permissions( outdata.dataset, child_dataset.dataset ) # Move data from temp location to dataset location shutil.move( filename, child_dataset.file_name ) self.sa_session.add( child_dataset ) self.sa_session.flush() child_dataset.set_size() child_dataset.name = "Secondary Dataset (%s)" % ( designation ) child_dataset.init_meta() child_dataset.set_meta() child_dataset.set_peek() # Associate new dataset with job job = None for assoc in outdata.creating_job_associations: job = assoc.job break if job: assoc = self.app.model.JobToOutputDatasetAssociation( '__new_child_file_%s|%s__' % ( name, designation ), child_dataset ) assoc.job = job self.sa_session.add( assoc ) self.sa_session.flush() child_dataset.state = outdata.state self.sa_session.add( child_dataset ) self.sa_session.flush() # Add child to return dict children[name][designation] = child_dataset # Need to update all associated output hdas, i.e. history was # shared with job running for dataset in outdata.dataset.history_associations: if outdata == dataset: continue # Create new child dataset child_data = child_dataset.copy( parent_id = dataset.id ) self.sa_session.add( child_dataset ) self.sa_session.flush() return children def collect_primary_datasets( self, output): """ Find any additional datasets generated by a tool and attach (for cases where number of outputs is not known in advance). """ primary_datasets = {} # Loop through output file names, looking for generated primary # datasets in form of: # 'primary_associatedWithDatasetID_designation_visibility_extension(_DBKEY)' for name, outdata in output.items(): for filename in glob.glob(os.path.join(self.app.config.new_file_path,"primary_%i_*" % outdata.id) ): if not name in primary_datasets: primary_datasets[name] = {} fields = os.path.basename(filename).split("_") fields.pop(0) parent_id = int(fields.pop(0)) designation = fields.pop(0) visible = fields.pop(0).lower() if visible == "visible": visible = True else: visible = False ext = fields.pop(0).lower() dbkey = outdata.dbkey if fields: dbkey = fields[ 0 ] # Create new primary dataset primary_data = self.app.model.HistoryDatasetAssociation( extension=ext, designation=designation, visible=visible, dbkey=dbkey, create_dataset=True, sa_session=self.sa_session ) self.app.security_agent.copy_dataset_permissions( outdata.dataset, primary_data.dataset ) self.sa_session.add( primary_data ) self.sa_session.flush() # Move data from temp location to dataset location shutil.move( filename, primary_data.file_name ) primary_data.set_size() primary_data.name = outdata.name primary_data.info = outdata.info primary_data.init_meta( copy_from=outdata ) primary_data.dbkey = dbkey primary_data.set_meta() primary_data.set_peek() # Associate new dataset with job job = None for assoc in outdata.creating_job_associations: job = assoc.job break if job: assoc = self.app.model.JobToOutputDatasetAssociation( '__new_primary_file_%s|%s__' % ( name, designation ), primary_data ) assoc.job = job self.sa_session.add( assoc ) self.sa_session.flush() primary_data.state = outdata.state self.sa_session.add( primary_data ) self.sa_session.flush() outdata.history.add_dataset( primary_data ) # Add dataset to return dict primary_datasets[name][designation] = primary_data # Need to update all associated output hdas, i.e. history was # shared with job running for dataset in outdata.dataset.history_associations: if outdata == dataset: continue new_data = primary_data.copy() dataset.history.add( new_data ) self.sa_session.add( new_data ) self.sa_session.flush() return primary_datasets class DataSourceTool( Tool ): """ Alternate implementation of Tool for data_source tools -- those that allow the user to query and extract data from another web site. """ tool_type = 'data_source' def _build_GALAXY_URL_parameter( self ): return ToolParameter.build( self, ElementTree.XML( '' % self.id ) ) def parse_inputs( self, root ): Tool.parse_inputs( self, root ) if 'GALAXY_URL' not in self.inputs: self.inputs[ 'GALAXY_URL' ] = self._build_GALAXY_URL_parameter() def exec_before_job( self, app, inp_data, out_data, param_dict={} ): # TODO: Allow for a generic way for all Tools to have output dataset # properties be set to input parameter values as defined in a # tool XML dbkey = param_dict.get( 'dbkey' ) organism = param_dict.get( 'organism' ) table = param_dict.get( 'table' ) description = param_dict.get( 'description' ) info = param_dict.get( 'info' ) if description == 'range': description = param_dict.get( 'position', '' ) if not description: description = 'unknown position' gb_landmark_region = param_dict.get( 'q' ) data_type = param_dict.get( 'data_type' ) items = out_data.items() for name, data in items: if organism and table and description: # This is UCSC data.name = '%s on %s: %s (%s)' % ( data.name, organism, table, description ) elif gb_landmark_region: # This is GBrowse data.name = '%s on %s' % ( data.name, gb_landmark_region ) data.info = info data.dbkey = dbkey if data_type not in app.datatypes_registry.datatypes_by_extension: # Setting data_type to tabular will force the data to be sniffed in exec_after_process() data_type = 'tabular' data.change_datatype( data_type ) # Store external data source's request parameters temporarily in # output file. In case the config setting for # "outputs_to_working_directory" is True, we must write to the # DatasetFilenameWrapper object in the param_dict since it's # "false_path" attribute is the temporary path to the output dataset # ( until the job is run ). However, even if the # "outputs_to_working_directory" setting is False, we can still # open the file the same way for temporarily storing the request # parameters. ## TODO: Input parameters should be jsonified and written into a ## and passed to data_source.py, instead of ## writing tab separated key, value pairs to the output file out = open( str( param_dict.get( name ) ), 'w' ) for key, value in param_dict.items(): print >> out, '%s\t%s' % ( key, value ) out.close() def exec_after_process( self, app, inp_data, out_data, param_dict, job = None ): name, data = out_data.items()[0] data.set_size() #TODO: these should be already be set before the tool runs: if data.state == data.states.OK: data.name = param_dict.get( 'name', data.name ) data.info = param_dict.get( 'info', data.name ) data.dbkey = param_dict.get( 'dbkey', data.dbkey ) data.extension = param_dict.get( 'data_type', data.extension ) #TODO: these should be possible as part of data_source.py and external set_meta, see the upload tool: if data.extension in [ 'txt', 'tabular' ]: data_type = sniff.guess_ext( data.file_name, sniff_order=app.datatypes_registry.sniff_order, is_multi_byte=self.is_multi_byte ) if data.extension != data_type: data.change_datatype( data_type ) elif not isinstance( data.datatype, datatypes.interval.Bed ) and isinstance( data.datatype, datatypes.interval.Interval ): if data.missing_meta(): data.change_datatype( 'tabular' ) data.set_peek() self.sa_session.add( data ) self.sa_session.flush() class AsyncDataSourceTool( DataSourceTool ): tool_type = 'data_source_async' def _build_GALAXY_URL_parameter( self ): return ToolParameter.build( self, ElementTree.XML( '' % self.id ) ) class DataDestinationTool( Tool ): tool_type = 'data_destination' class SetMetadataTool( Tool ): """ Tool implementation for special tool that sets metadata on an existing dataset. """ tool_type = 'set_metadata' def exec_after_process( self, app, inp_data, out_data, param_dict, job = None ): for name, dataset in inp_data.iteritems(): external_metadata = galaxy.datatypes.metadata.JobExternalOutputMetadataWrapper( job ) if external_metadata.external_metadata_set_successfully( dataset, app.model.context ): dataset.metadata.from_JSON_dict( external_metadata.get_output_filenames_by_dataset( dataset, app.model.context ).filename_out ) else: dataset._state = model.Dataset.states.FAILED_METADATA self.sa_session.add( dataset ) self.sa_session.flush() return # If setting external metadata has failed, how can we inform the # user? For now, we'll leave the default metadata and set the state # back to its original. dataset.datatype.after_setting_metadata( dataset ) if job and job.tool_id == '1.0.0': dataset.state = param_dict.get( '__ORIGINAL_DATASET_STATE__' ) else: # Revert dataset.state to fall back to dataset.dataset.state dataset._state = None # Need to reset the peek, which may rely on metadata dataset.set_peek() self.sa_session.add( dataset ) self.sa_session.flush() def job_failed( self, job_wrapper, message, exception = False ): job = job_wrapper.sa_session.query( model.Job ).get( job_wrapper.job_id ) if job: inp_data = {} for dataset_assoc in job.input_datasets: inp_data[dataset_assoc.name] = dataset_assoc.dataset return self.exec_after_process( job_wrapper.app, inp_data, {}, job_wrapper.get_param_dict(), job = job ) class ExportHistoryTool( Tool ): tool_type = 'export_history' # Populate tool_type to ToolClass mappings tool_types = {} for tool_class in [ Tool, DataDestinationTool, SetMetadataTool, DataSourceTool, AsyncDataSourceTool ]: tool_types[ tool_class.tool_type ] = tool_class # ---- Utility classes to be factored out ----------------------------------- class BadValue( object ): def __init__( self, value ): self.value = value class RawObjectWrapper( object ): """ Wraps an object so that __str__ returns module_name:class_name. """ def __init__( self, obj ): self.obj = obj def __str__( self ): return "%s:%s" % (self.obj.__module__, self.obj.__class__.__name__) def __getattr__( self, key ): return getattr( self.obj, key ) class InputValueWrapper( object ): """ Wraps an input so that __str__ gives the "param_dict" representation. """ def __init__( self, input, value, other_values={} ): self.input = input self.value = value self._other_values = other_values def __str__( self ): return self.input.to_param_dict_string( self.value, self._other_values ) def __getattr__( self, key ): return getattr( self.value, key ) class SelectToolParameterWrapper( object ): """ Wraps a SelectTooParameter so that __str__ returns the selected value, but all other attributes are accessible. """ def __init__( self, input, value, app, other_values={} ): self.input = input self.value = value self.input.value_label = input.value_to_display_text( value, app ) self._other_values = other_values def __str__( self ): return self.input.to_param_dict_string( self.value, other_values = self._other_values ) def __getattr__( self, key ): return getattr( self.input, key ) class DatasetFilenameWrapper( object ): """ Wraps a dataset so that __str__ returns the filename, but all other attributes are accessible. """ class MetadataWrapper: """ Wraps a Metadata Collection to return MetadataParameters wrapped according to the metadata spec. Methods implemented to match behavior of a Metadata Collection. """ def __init__( self, metadata ): self.metadata = metadata def __getattr__( self, name ): rval = self.metadata.get( name, None ) if name in self.metadata.spec: if rval is None: rval = self.metadata.spec[name].no_value rval = self.metadata.spec[name].param.to_string( rval ) # Store this value, so we don't need to recalculate if needed # again setattr( self, name, rval ) return rval def __nonzero__( self ): return self.metadata.__nonzero__() def __iter__( self ): return self.metadata.__iter__() def get( self, key, default=None ): try: return getattr( self, key ) except: return default def items( self ): return iter( [ ( k, self.get( k ) ) for k, v in self.metadata.items() ] ) def __init__( self, dataset, datatypes_registry = None, tool = None, name = None, false_path = None ): if not dataset: try: # TODO: allow this to work when working with grouping ext = tool.inputs[name].extensions[0] except: ext = 'data' self.dataset = NoneDataset( datatypes_registry = datatypes_registry, ext = ext ) else: self.dataset = dataset self.metadata = self.MetadataWrapper( dataset.metadata ) self.false_path = false_path def __str__( self ): if self.false_path is not None: return self.false_path else: return self.dataset.file_name def __getattr__( self, key ): if self.false_path is not None and key == 'file_name': return self.false_path else: return getattr( self.dataset, key ) def json_fix( val ): if isinstance( val, list ): return [ json_fix( v ) for v in val ] elif isinstance( val, dict ): return dict( [ ( json_fix( k ), json_fix( v ) ) for ( k, v ) in val.iteritems() ] ) elif isinstance( val, unicode ): return val.encode( "utf8" ) else: return val def get_incoming_value( incoming, key, default ): if "__" + key + "__is_composite" in incoming: composite_keys = incoming["__" + key + "__keys"].split() value = dict() for composite_key in composite_keys: value[composite_key] = incoming[key + "_" + composite_key] return value else: return incoming.get( key, default ) class InterruptedUpload( Exception ): pass