""" Basic tool parameters. """ import logging, string, sys, os, os.path from elementtree.ElementTree import XML, Element from galaxy import config, datatypes, util from galaxy.web import form_builder from galaxy.util.bunch import Bunch from galaxy.util import string_as_bool, sanitize_param from sanitize import ToolParameterSanitizer import validation, dynamic_options # For BaseURLToolParameter from galaxy.web import url_for import galaxy.model log = logging.getLogger(__name__) class ToolParameter( object ): """ Describes a parameter accepted by a tool. This is just a simple stub at the moment but in the future should encapsulate more complex parameters (lists of valid choices, validation logic, ...) """ def __init__( self, tool, param, context=None ): self.tool = tool self.refresh_on_change = False self.refresh_on_change_values = [] self.name = param.get("name") self.type = param.get("type") self.label = util.xml_text(param, "label") self.help = util.xml_text(param, "help") self.sanitizer = param.find( "sanitizer" ) if self.sanitizer is not None: self.sanitizer = ToolParameterSanitizer.from_element( self.sanitizer ) self.html = "no html set" self.repeat = param.get("repeat", None) self.condition = param.get( "condition", None ) self.validators = [] for elem in param.findall("validator"): self.validators.append( validation.Validator.from_element( self, elem ) ) def get_label( self ): """Return user friendly name for the parameter""" if self.label: return self.label else: return self.name def get_html_field( self, trans=None, value=None, other_values={} ): raise TypeError( "Abstract Method" ) def get_html( self, trans=None, value=None, other_values={}): """ Returns the html widget corresponding to the parameter. Optionally attempt to retain the current value specific by 'value' """ return self.get_html_field( trans, value, other_values ).get_html() def from_html( self, value, trans=None, other_values={} ): """ Convert a value from an HTML POST into the parameters preferred value format. """ return value def get_initial_value( self, trans, context ): """ Return the starting value of the parameter """ return None def get_required_enctype( self ): """ If this parameter needs the form to have a specific encoding return it, otherwise return None (indicating compatibility with any encoding) """ return None def get_dependencies( self ): """ Return the names of any other parameters this parameter depends on """ return [] def filter_value( self, value, trans=None, other_values={} ): """ Parse the value returned by the view into a form usable by the tool OR raise a ValueError. """ return value def to_string( self, value, app ): """Convert a value to a string representation suitable for persisting""" return str( value ) def to_python( self, value, app ): """Convert a value created with to_string back to an object representation""" return value def value_to_basic( self, value, app ): if isinstance( value, RuntimeValue ): return { "__class__": "RuntimeValue" } return self.to_string( value, app ) def value_from_basic( self, value, app, ignore_errors=False ): # HACK: Some things don't deal with unicode well, psycopg problem? if type( value ) == unicode: value = str( value ) # Handle Runtime values (valid for any parameter?) if isinstance( value, dict ) and '__class__' in value and value['__class__'] == "RuntimeValue": return RuntimeValue() # Delegate to the 'to_python' method if ignore_errors: try: return self.to_python( value, app ) except: return value else: return self.to_python( value, app ) def value_to_display_text( self, value, app ): """ Convert a value to a text representation suitable for displaying to the user """ return value def to_param_dict_string( self, value, other_values={} ): value = str( value ) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: value = self.sanitizer.sanitize_param( value ) else: value = sanitize_param( value ) return value def validate( self, value, history=None ): for validator in self.validators: validator.validate( value, history ) @classmethod def build( cls, tool, param ): """Factory method to create parameter of correct type""" param_type = param.get("type") if not param_type or param_type not in parameter_types: raise ValueError( "Unknown tool parameter type '%s'" % param_type ) else: return parameter_types[param_type]( tool, param ) class TextToolParameter( ToolParameter ): """ Parameter that can take on any text value. >>> p = TextToolParameter( None, XML( '' ) ) >>> print p.name blah >>> print p.get_html() >>> print p.get_html( value="meh" ) """ def __init__( self, tool, elem ): ToolParameter.__init__( self, tool, elem ) self.size = elem.get( 'size' ) self.value = elem.get( 'value' ) self.area = string_as_bool( elem.get( 'area', False ) ) def get_html_field( self, trans=None, value=None, other_values={} ): if self.area: return form_builder.TextArea( self.name, self.size, value or self.value ) else: return form_builder.TextField( self.name, self.size, value or self.value ) def get_initial_value( self, trans, context ): return self.value class IntegerToolParameter( TextToolParameter ): """ Parameter that takes an integer value. >>> p = IntegerToolParameter( None, XML( '' ) ) >>> print p.name blah >>> print p.get_html() >>> type( p.from_html( "10" ) ) >>> type( p.from_html( "bleh" ) ) Traceback (most recent call last): ... ValueError: An integer is required """ def __init__( self, tool, elem ): TextToolParameter.__init__( self, tool, elem ) if self.value: try: int( self.value ) except: raise ValueError( "An integer is required" ) elif self.value is None: raise ValueError( "The settings for this field require a 'value' setting and optionally a default value which must be an integer" ) def get_html_field( self, trans=None, value=None, other_values={} ): if isinstance( value, int ): value = str( value ) return super( IntegerToolParameter, self ).get_html_field( trans=trans, value=value, other_values=other_values ) def from_html( self, value, trans=None, other_values={} ): try: return int( value ) except: raise ValueError( "An integer is required" ) def to_python( self, value, app ): return int( value ) def get_initial_value( self, trans, context ): if self.value: return int( self.value ) else: return None class FloatToolParameter( TextToolParameter ): """ Parameter that takes a real number value. >>> p = FloatToolParameter( None, XML( '' ) ) >>> print p.name blah >>> print p.get_html() >>> type( p.from_html( "36.1" ) ) >>> type( p.from_html( "bleh" ) ) Traceback (most recent call last): ... ValueError: A real number is required """ def __init__( self, tool, elem ): TextToolParameter.__init__( self, tool, elem ) if self.value: try: float( self.value ) except: raise ValueError( "A real number is required" ) elif self.value is None: raise ValueError( "The settings for this field require a 'value' setting and optionally a default value which must be a real number" ) def get_html_field( self, trans=None, value=None, other_values={} ): if isinstance( value, float ): value = str( value ) return super( FloatToolParameter, self ).get_html_field( trans=trans, value=value, other_values=other_values ) def from_html( self, value, trans=None, other_values={} ): try: return float( value ) except: raise ValueError( "A real number is required" ) def to_python( self, value, app ): return float( value ) def get_initial_value( self, trans, context ): try: return float( self.value ) except: return None class BooleanToolParameter( ToolParameter ): """ Parameter that takes one of two values. >>> p = BooleanToolParameter( None, XML( '' ) ) >>> print p.name blah >>> print p.get_html() >>> print p.from_html( ["true","true"] ) True >>> print p.to_param_dict_string( True ) bulletproof vests >>> print p.from_html( ["true"] ) False >>> print p.to_param_dict_string( False ) cellophane chests """ def __init__( self, tool, elem ): ToolParameter.__init__( self, tool, elem ) self.truevalue = elem.get( 'truevalue', 'true' ) self.falsevalue = elem.get( 'falsevalue', 'false' ) self.checked = string_as_bool( elem.get( 'checked' ) ) def get_html_field( self, trans=None, value=None, other_values={} ): checked = self.checked if value: checked = form_builder.CheckboxField.is_checked( value ) return form_builder.CheckboxField( self.name, checked ) def from_html( self, value, trans=None, other_values={} ): return form_builder.CheckboxField.is_checked( value ) def to_python( self, value, app ): return ( value == 'True' ) def get_initial_value( self, trans, context ): return self.checked def to_param_dict_string( self, value, other_values={} ): if value: return self.truevalue else: return self.falsevalue class FileToolParameter( ToolParameter ): """ Parameter that takes an uploaded file as a value. >>> p = FileToolParameter( None, XML( '' ) ) >>> print p.name blah >>> print p.get_html() >>> p = FileToolParameter( None, XML( '' ) ) >>> print p.get_html() """ def __init__( self, tool, elem ): """ Example: C{} """ ToolParameter.__init__( self, tool, elem ) self.ajax = string_as_bool( elem.get( 'ajax-upload' ) ) def get_html_field( self, trans=None, value=None, other_values={} ): return form_builder.FileField( self.name, ajax = self.ajax, value = value ) def from_html( self, value, trans=None, other_values={} ): # Middleware or proxies may encode files in special ways (TODO: this # should be pluggable) if type( value ) == dict: upload_store = self.tool.app.config.nginx_upload_store assert upload_store, \ "Request appears to have been processed by nginx_upload_module \ but Galaxy is not configured to recognize it" # Check that the file is in the right location local_filename = os.path.abspath( value['path'] ) assert local_filename.startswith( upload_store ), \ "Filename provided by nginx is not in correct directory" value = dict( filename = value["name"], local_filename = local_filename ) return value def get_required_enctype( self ): """ File upload elements require the multipart/form-data encoding """ return "multipart/form-data" def to_string( self, value, app ): if value in [ None, '' ]: return None elif isinstance( value, unicode ) or isinstance( value, str ): return value elif isinstance( value, dict ): # or should we jsonify? try: return value['local_filename'] except: return None raise Exception( "FileToolParameter cannot be persisted" ) def to_python( self, value, app ): if value is None: return None elif isinstance( value, unicode ) or isinstance( value, str ): return value else: raise Exception( "FileToolParameter cannot be persisted" ) def get_initial_value( self, trans, context ): return None class HiddenToolParameter( ToolParameter ): """ Parameter that takes one of two values. FIXME: This seems hacky, parameters should only describe things the user might change. It is used for 'initializing' the UCSC proxy tool >>> p = HiddenToolParameter( None, XML( '' ) ) >>> print p.name blah >>> print p.get_html() """ def __init__( self, tool, elem ): ToolParameter.__init__( self, tool, elem ) self.value = elem.get( 'value' ) def get_html_field( self, trans=None, value=None, other_values={} ): return form_builder.HiddenField( self.name, self.value ) def get_initial_value( self, trans, context ): return self.value def get_label( self ): return None ## This is clearly a HACK, parameters should only be used for things the user ## can change, there needs to be a different way to specify this. I'm leaving ## it for now to avoid breaking any tools. class BaseURLToolParameter( ToolParameter ): """ Returns a parameter the contains its value prepended by the current server base url. Used in all redirects. """ def __init__( self, tool, elem ): ToolParameter.__init__( self, tool, elem ) self.value = elem.get( 'value', '' ) def get_value( self, trans ): # url = trans.request.base + self.value url = url_for( self.value, qualified=True ) return url def get_html_field( self, trans=None, value=None, other_values={} ): return form_builder.HiddenField( self.name, self.get_value( trans ) ) def get_initial_value( self, trans, context ): return self.value def get_label( self ): # BaseURLToolParameters are ultimately "hidden" parameters return None class SelectToolParameter( ToolParameter ): """ Parameter that takes on one (or many) or a specific set of values. >>> p = SelectToolParameter( None, XML( ... ''' ... ... ... ... ... ... ''' ) ) >>> print p.name blah >>> print p.get_html() >>> print p.get_html( value="z" ) >>> print p.filter_value( "y" ) y >>> p = SelectToolParameter( None, XML( ... ''' ... ... ... ... ... ... ''' ) ) >>> print p.name blah >>> print p.get_html() >>> print p.get_html( value=["x","y"]) >>> print p.to_param_dict_string( ["y", "z"] ) y,z >>> p = SelectToolParameter( None, XML( ... ''' ... ... ... ... ... ... ''' ) ) >>> print p.name blah >>> print p.get_html()
I am X
I am Y
I am Z
>>> print p.get_html( value=["x","y"])
I am X
I am Y
I am Z
>>> print p.to_param_dict_string( ["y", "z"] ) y,z """ def __init__( self, tool, elem, context=None ): ToolParameter.__init__( self, tool, elem ) self.multiple = string_as_bool( elem.get( 'multiple', False ) ) self.display = elem.get( 'display', None ) self.separator = elem.get( 'separator', ',' ) self.legal_values = set() # TODO: the tag is deprecated and should be replaced with the tag. self.dynamic_options = elem.get( "dynamic_options", None ) options = elem.find( 'options' ) if options is None: self.options = None else: self.options = dynamic_options.DynamicOptions( options, self ) for validator in self.options.validators: self.validators.append( validator ) if self.dynamic_options is None and self.options is None: self.static_options = list() for index, option in enumerate( elem.findall( "option" ) ): value = option.get( "value" ) self.legal_values.add( value ) selected = string_as_bool( option.get( "selected", False ) ) self.static_options.append( ( option.text, value, selected ) ) self.is_dynamic = ( ( self.dynamic_options is not None ) or ( self.options is not None ) ) def get_options( self, trans, other_values ): if self.options: return self.options.get_options( trans, other_values ) elif self.dynamic_options: return eval( self.dynamic_options, self.tool.code_namespace, other_values ) else: return self.static_options def get_legal_values( self, trans, other_values ): def _get_UnvalidatedValue_value( value ): if isinstance( value, UnvalidatedValue ): return value.value return value if self.options: return map( _get_UnvalidatedValue_value, set( v for _, v, _ in self.options.get_options( trans, other_values ) ) ) elif self.dynamic_options: return set( v for _, v, _ in eval( self.dynamic_options, self.tool.code_namespace, other_values ) ) else: return self.legal_values def get_html_field( self, trans=None, value=None, context={} ): # Dynamic options are not yet supported in workflow, allow # specifying the value as text for now. if self.need_late_validation( trans, context ): assert isinstance( value, UnvalidatedValue ) value = value.value if self.multiple: if value is None: value = "" else: value = "\n".join( value ) return form_builder.TextArea( self.name, value=value ) else: return form_builder.TextField( self.name, value=(value or "") ) if value is not None: if not isinstance( value, list ): value = [ value ] field = form_builder.SelectField( self.name, self.multiple, self.display, self.refresh_on_change, refresh_on_change_values = self.refresh_on_change_values ) options = self.get_options( trans, context ) for text, optval, selected in options: if isinstance( optval, UnvalidatedValue ): optval = optval.value text = "%s (unvalidated)" % text if value: selected = ( optval in value ) field.add_option( text, optval, selected ) return field def from_html( self, value, trans=None, context={} ): if self.need_late_validation( trans, context ): if self.multiple: # While it is generally allowed that a select value can be '', # we do not allow this to be the case in a dynamically # generated multiple select list being set in workflow building # mode we instead treat '' as 'No option Selected' (None) if value == '': value = None else: if not isinstance( value, list ): value = value.split( "\n" ) return UnvalidatedValue( value ) legal_values = self.get_legal_values( trans, context ) if isinstance( value, list ): if not(self.repeat): assert self.multiple, "Multiple values provided but parameter is not expecting multiple values" rval = [] for v in value: v = util.restore_text( v ) if v not in legal_values: raise ValueError( "An invalid option was selected, please verify" ) rval.append( v ) return rval else: value = util.restore_text( value ) if value not in legal_values: raise ValueError( "An invalid option was selected, please verify" ) return value def to_param_dict_string( self, value, other_values={} ): if value is None: return "None" if isinstance( value, list ): if not( self.repeat ): assert self.multiple, "Multiple values provided but parameter is not expecting multiple values" value = map( str, value ) else: value = str( value ) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: value = self.sanitizer.sanitize_param( value ) else: value = sanitize_param( value ) if isinstance( value, list ): value = self.separator.join( value ) return value def value_to_basic( self, value, app ): if isinstance( value, UnvalidatedValue ): return { "__class__": "UnvalidatedValue", "value": value.value } elif isinstance( value, RuntimeValue ): # Need to handle runtime value's ourself since delegating to the # parent method causes the value to be turned into a string, which # breaks multiple selection return { "__class__": "RuntimeValue" } return value def value_from_basic( self, value, app, ignore_errors=False ): if isinstance( value, dict ) and value["__class__"] == "UnvalidatedValue": return UnvalidatedValue( value["value"] ) return super( SelectToolParameter, self ).value_from_basic( value, app ) def need_late_validation( self, trans, context ): """ Determine whether we need to wait to validate this parameters value given the current state. For parameters with static options this is always false (can always validate immediately). For parameters with dynamic options, we need to check whether the other parameters which determine what options are valid have been set. For the old style dynamic options which do not specify dependencies, this is always true (must valiate at runtime). """ # Option list is statically defined, never need late validation if not self.is_dynamic: return False # Old style dynamic options, no dependency information so there isn't # a lot we can do: if we're dealing with workflows, have to assume # late validation no matter what. if self.dynamic_options is not None: return ( trans is None or trans.workflow_building_mode ) # If we got this far, we can actually look at the dependencies # to see if their values will not be available until runtime. for dep_name in self.get_dependencies(): dep_value = context[ dep_name ] # Dependency on a dataset that does not yet exist if isinstance( dep_value, DummyDataset ): return True # Dependency on a value that has not been checked if isinstance( dep_value, UnvalidatedValue ): return True # Dependency on a value that does not yet exist if isinstance( dep_value, RuntimeValue ): return True # Dynamic, but all dependenceis are known and have values return False def get_initial_value( self, trans, context ): # More working around dynamic options for workflow if self.need_late_validation( trans, context ): # Really the best we can do? return UnvalidatedValue( None ) options = list( self.get_options( trans, context ) ) value = [ optval for _, optval, selected in options if selected ] if len( value ) == 0: if not self.multiple and options: # Nothing selected, but not a multiple select, with some values, # so we have to default to something (the HTML form will anyway) # TODO: deal with optional parameters in a better way value = options[0][1] else: value = None elif len( value ) == 1: value = value[0] return value def value_to_display_text( self, value, app ): if isinstance( value, UnvalidatedValue ): suffix = "\n(value not yet validated)" value = value.value else: suffix = "" if not isinstance( value, list ): value = [ value ] # FIXME: Currently only translating values back to labels if they # are not dynamic if self.is_dynamic: rval = map( str, value ) else: options = list( self.static_options ) rval = [] for t, v, s in options: if v in value: rval.append( t ) return "\n".join( rval ) + suffix def get_dependencies( self ): """ Get the *names* of the other params this param depends on. """ if self.options: return self.options.get_dependency_names() else: return [] class GenomeBuildParameter( SelectToolParameter ): """ Select list that sets the last used genome build for the current history as "selected". >>> # Create a mock transcation with 'hg17' as the current build >>> from galaxy.util.bunch import Bunch >>> trans = Bunch( history=Bunch( genome_build='hg17' ), db_builds=util.dbnames ) >>> p = GenomeBuildParameter( None, XML( ... ''' ... ... ''' ) ) >>> print p.name blah >>> # hg17 should be selected by default >>> print p.get_html( trans ) # doctest: +ELLIPSIS >>> # If the user selected something else already, that should be used >>> # instead >>> print p.get_html( trans, value='hg18' ) # doctest: +ELLIPSIS >>> print p.filter_value( "hg17" ) hg17 """ def get_options( self, trans, other_values ): last_used_build = trans.history.genome_build for dbkey, build_name in trans.db_builds: yield build_name, dbkey, ( dbkey == last_used_build ) def get_legal_values( self, trans, other_values ): return set( dbkey for dbkey, _ in trans.db_builds ) class ColumnListParameter( SelectToolParameter ): """ Select list that consists of either the total number of columns or only those columns that contain numerical values in the associated DataToolParameter. # TODO: we need better testing here, but not sure how to associate a DatatoolParameter with a ColumnListParameter # from a twill perspective... >>> # Mock up a history (not connected to database) >>> from galaxy.model import History, HistoryDatasetAssociation >>> from galaxy.util.bunch import Bunch >>> from galaxy.model.mapping import context as sa_session >>> hist = History() >>> sa_session.add( hist ) >>> sa_session.flush() >>> hist.add_dataset( HistoryDatasetAssociation( id=1, extension='interval', create_dataset=True, sa_session=sa_session ) ) >>> dtp = DataToolParameter( None, XML( '' ) ) >>> print dtp.name blah >>> clp = ColumnListParameter ( None, XML( '' ) ) >>> print clp.name numerical_column """ def __init__( self, tool, elem ): SelectToolParameter.__init__( self, tool, elem ) self.tool = tool self.numerical = string_as_bool( elem.get( "numerical", False )) self.force_select = string_as_bool( elem.get( "force_select", True )) self.accept_default = string_as_bool( elem.get( "accept_default", False )) self.data_ref = elem.get( "data_ref", None ) self.default_value = elem.get( "default_value", None ) self.is_dynamic = True def from_html( self, value, trans=None, context={} ): """ Label convention prepends column number with a 'c', but tool uses the integer. This removes the 'c' when entered into a workflow. """ def _strip_c( column ): column = column.strip().lower() if column.startswith( 'c' ): column = column[1:] return column if self.multiple: #split on newline and , if value: column_list = [] if not isinstance( value, list ): value = value.split( '\n' ) for column in value: for column2 in column.split( ',' ): column2 = column2.strip() if column2: column_list.append( column2 ) value = map( _strip_c, column_list ) else: value = [] else: if value: value = _strip_c( value ) else: value = None return super( ColumnListParameter, self ).from_html( value, trans, context ) def get_column_list( self, trans, other_values ): """ Generate a select list containing the columns of the associated dataset (if found). """ column_list = [] # No value indicates a configuration error, the named DataToolParameter # must preceed this parameter in the config assert self.data_ref in other_values, "Value for associated DataToolParameter not found" # Get the value of the associated DataToolParameter (a dataset) dataset = other_values[ self.data_ref ] # Check if a dataset is selected if dataset is None or dataset == '': # NOTE: Both of these values indicate that no dataset is selected. # However, 'None' indicates that the dataset is optional # while '' indicates that it is not. Currently column # parameters do not work well with optional datasets return column_list # Generate options if not dataset.metadata.columns: if self.accept_default: column_list.append( self.default_value or '1' ) return column_list if not self.force_select: column_list.append( 'None' ) if self.numerical: # If numerical was requsted, filter columns based on metadata for i, col in enumerate( dataset.metadata.column_types ): if col == 'int' or col == 'float': column_list.append( str( i + 1 ) ) else: for i in range(0, dataset.metadata.columns): column_list.append( str( i + 1 ) ) return column_list def get_options( self, trans, other_values ): options = [] column_list = self.get_column_list( trans, other_values ) if len( column_list ) > 0 and not self.force_select: options.append( ('?', 'None', False) ) for col in column_list: if col != 'None': options.append( ( 'c' + col, col, False ) ) return options def get_initial_value( self, trans, context ): if self.default_value is not None: # dataset not ready / in workflow / etc if self.need_late_validation( trans, context ): return UnvalidatedValue( self.default_value ) return self.default_value return SelectToolParameter.get_initial_value( self, trans, context ) def get_legal_values( self, trans, other_values ): return set( self.get_column_list( trans, other_values ) ) def get_dependencies( self ): return [ self.data_ref ] def need_late_validation( self, trans, context ): if super( ColumnListParameter, self ).need_late_validation( trans, context ): return True # Get the selected dataset if selected dataset = context[ self.data_ref ] if dataset: # Check if the dataset does not have the expected metadata for columns if not dataset.metadata.columns: # Only allow late validation if the dataset is not yet ready # (since we have reason to expect the metadata to be ready eventually) if dataset.is_pending: return True # No late validation return False class DrillDownSelectToolParameter( SelectToolParameter ): """ Parameter that takes on one (or many) of a specific set of values. Creating a hierarchical select menu, which allows users to 'drill down' a tree-like set of options. >>> p = DrillDownSelectToolParameter( None, XML( ... ''' ... ... ... ... ... ... ... ''' ) ) >>> print p.get_html()
  • [+]Heading 1
    • Option 1
    • Option 2
    • [+]Heading 1
      • Option 3
      • Option 4
  • Option 5
>>> p = DrillDownSelectToolParameter( None, XML( ... ''' ... ... ... ... ... ... ... ''' ) ) >>> print p.get_html()
  • [+]Heading 1
    • Option 1
    • Option 2
    • [+]Heading 1
      • Option 3
      • Option 4
  • Option 5
>>> print p.options [{'selected': False, 'name': 'Heading 1', 'value': 'heading1', 'options': [{'selected': False, 'name': 'Option 1', 'value': 'option1', 'options': []}, {'selected': False, 'name': 'Option 2', 'value': 'option2', 'options': []}, {'selected': False, 'name': 'Heading 1', 'value': 'heading1', 'options': [{'selected': False, 'name': 'Option 3', 'value': 'option3', 'options': []}, {'selected': False, 'name': 'Option 4', 'value': 'option4', 'options': []}]}]}, {'selected': False, 'name': 'Option 5', 'value': 'option5', 'options': []}] """ def __init__( self, tool, elem, context=None ): def recurse_option_elems( cur_options, option_elems ): for option_elem in option_elems: selected = string_as_bool( option_elem.get( 'selected', False ) ) cur_options.append( { 'name':option_elem.get( 'name' ), 'value': option_elem.get( 'value'), 'options':[], 'selected':selected } ) recurse_option_elems( cur_options[-1]['options'], option_elem.findall( 'option' ) ) ToolParameter.__init__( self, tool, elem ) self.multiple = string_as_bool( elem.get( 'multiple', False ) ) self.display = elem.get( 'display', None ) self.hierarchy = elem.get( 'hierarchy', 'exact' ) #exact or recurse self.separator = elem.get( 'separator', ',' ) from_file = elem.get( 'from_file', None ) if from_file: if not os.path.isabs( from_file ): from_file = os.path.join( tool.app.config.tool_data_path, from_file ) elem = XML( "%s" % open( from_file ).read() ) self.is_dynamic = False self.dynamic_options = None #backwards compatibility with SelectToolParameter's old dynamic options and late validation self.options = [] self.filtered = {} if elem.find( 'filter' ): self.is_dynamic = True for filter in elem.findall( 'filter' ): #currently only filtering by metadata key matching input file is allowed if filter.get( 'type' ) == 'data_meta': if filter.get( 'data_ref' ) not in self.filtered: self.filtered[filter.get( 'data_ref' )] = {} if filter.get( 'meta_key' ) not in self.filtered[filter.get( 'data_ref' )]: self.filtered[filter.get( 'data_ref' )][filter.get( 'meta_key' )] = {} if filter.get( 'value' ) not in self.filtered[filter.get( 'data_ref' )][filter.get( 'meta_key' )]: self.filtered[filter.get( 'data_ref' )][filter.get( 'meta_key' )][filter.get( 'value' )] = [] recurse_option_elems( self.filtered[filter.get( 'data_ref' )][filter.get( 'meta_key' )][filter.get( 'value' )], filter.find( 'options' ).findall( 'option' ) ) else: recurse_option_elems( self.options, elem.find( 'options' ).findall( 'option' ) ) def get_options( self, trans=None, value=None, other_values={} ): if self.is_dynamic: options = [] for filter_key, filter_value in self.filtered.iteritems(): dataset = other_values[filter_key] if dataset.__class__.__name__.endswith( "DatasetFilenameWrapper" ): #this is a bad way to check for this, but problems importing class ( due to circular imports? ) dataset = dataset.dataset if dataset: for meta_key, meta_dict in filter_value.iteritems(): check_meta_val = dataset.metadata.spec[meta_key].param.to_string( dataset.metadata.get( meta_key ) ) if check_meta_val in meta_dict: options.extend( meta_dict[check_meta_val] ) return options return self.options def get_legal_values( self, trans, other_values ): def recurse_options( legal_values, options ): for option in options: legal_values.append( option['value'] ) recurse_options( legal_values, option['options'] ) legal_values = [] recurse_options( legal_values, self.get_options( trans=trans, other_values=other_values ) ) return legal_values def get_html( self, trans=None, value=None, other_values={} ): """ Returns the html widget corresponding to the paramter. Optionally attempt to retain the current value specific by 'value' """ return self.get_html_field( trans, value, other_values ).get_html() def get_html_field( self, trans=None, value=None, other_values={} ): # Dynamic options are not yet supported in workflow, allow # specifying the value as text for now. if self.need_late_validation( trans, other_values ): if value is not None: assert isinstance( value, UnvalidatedValue ) value = value.value if self.multiple: if value is None: value = "" else: value = "\n".join( value ) return form_builder.TextArea( self.name, value=value ) else: return form_builder.TextField( self.name, value=(value or "") ) return form_builder.DrillDownField( self.name, self.multiple, self.display, self.refresh_on_change, self.get_options( trans, value, other_values ), value, refresh_on_change_values = self.refresh_on_change_values ) def from_html( self, value, trans=None, other_values={} ): if self.need_late_validation( trans, other_values ): if self.multiple: if value == '': #No option selected value = None else: value = value.split( "\n" ) return UnvalidatedValue( value ) if not value: return None if not isinstance( value, list ): value = [value] if not( self.repeat ) and len( value ) > 1: assert self.multiple, "Multiple values provided but parameter is not expecting multiple values" rval = [] for val in value: if val not in self.get_legal_values( trans, other_values ): raise ValueError( "An invalid option was selected, please verify" ) rval.append( util.restore_text( val ) ) return rval def to_param_dict_string( self, value, other_values={} ): def get_options_list( value ): def get_base_option( value, options ): for option in options: if value == option['value']: return option rval = get_base_option( value, option['options'] ) if rval: return rval return None #not found def recurse_option( option_list, option ): if not option['options']: option_list.append( option['value'] ) else: for opt in option['options']: recurse_option( option_list, opt ) rval = [] recurse_option( rval, get_base_option( value, self.get_options( other_values = other_values ) ) ) return rval or [value] if value is None: return "None" rval = [] if self.hierarchy == "exact": rval = value else: for val in value: options = get_options_list( val ) rval.extend( options ) if len( rval ) > 1: if not( self.repeat ): assert self.multiple, "Multiple values provided but parameter is not expecting multiple values" rval = self.separator.join( rval ) if self.tool is None or self.tool.options.sanitize: if self.sanitizer: rval = self.sanitizer.sanitize_param( rval ) else: rval = sanitize_param( rval ) return rval def get_initial_value( self, trans, context ): def recurse_options( initial_values, options ): for option in options: if option['selected']: initial_values.append( option['value'] ) recurse_options( initial_values, option['options'] ) # More working around dynamic options for workflow if self.need_late_validation( trans, context ): # Really the best we can do? return UnvalidatedValue( None ) initial_values = [] recurse_options( initial_values, self.get_options( trans=trans, other_values=context ) ) return initial_values def value_to_display_text( self, value, app ): def get_option_display( value, options ): for option in options: if value == option['value']: return option['name'] rval = get_option_display( value, option['options'] ) if rval: return rval return None #not found if isinstance( value, UnvalidatedValue ): suffix = "\n(value not yet validated)" value = value.value else: suffix = "" if not value: value = [] elif not isinstance( value, list ): value = [ value ] # FIXME: Currently only translating values back to labels if they # are not dynamic if self.is_dynamic: if value: if isinstance( value, list ): rval = value else: rval = [ value ] else: rval = [] else: rval = [] for val in value: rval.append( get_option_display( val, self.options ) or val ) return "\n".join( rval ) + suffix def get_dependencies( self ): """ Get the *names* of the other params this param depends on. """ return self.filtered.keys() class DummyDataset( object ): pass class DataToolParameter( ToolParameter ): # TODO, Nate: Make sure the following unit tests appropriately test the dataset security # components. Add as many additional tests as necessary. """ Parameter that takes on one (or many) or a specific set of values. TODO: There should be an alternate display that allows single selects to be displayed as radio buttons and multiple selects as a set of checkboxes TODO: The following must be fixed to test correctly for the new security_check tag in the DataToolParameter ( the last test below is broken ) Nate's next passs at the dataset security stuff will dramatically alter this anyway. """ def __init__( self, tool, elem ): ToolParameter.__init__( self, tool, elem ) # Add metadata validator if not string_as_bool( elem.get( 'no_validation', False ) ): self.validators.append( validation.MetadataValidator() ) # Build tuple of classes for supported data formats formats = [] self.extensions = elem.get( 'format', 'data' ).split( "," ) for extension in self.extensions: extension = extension.strip() if tool is None: #This occurs for things such as unit tests import galaxy.datatypes.registry formats.append( galaxy.datatypes.registry.Registry().get_datatype_by_extension( extension.lower() ).__class__ ) else: formats.append( tool.app.datatypes_registry.get_datatype_by_extension( extension.lower() ).__class__ ) self.formats = tuple( formats ) self.multiple = string_as_bool( elem.get( 'multiple', False ) ) # Optional DataToolParameters are used in tools like GMAJ and LAJ self.optional = string_as_bool( elem.get( 'optional', False ) ) # TODO: Enhance dynamic options for DataToolParameters. Currently, # only the special case key='build' of type='data_meta' is # a valid filter options = elem.find( 'options' ) if options is None: self.options = None else: self.options = dynamic_options.DynamicOptions( options, self ) self.is_dynamic = self.options is not None # Load conversions required for the dataset input self.conversions = [] for conv_elem in elem.findall( "conversion" ): name = conv_elem.get( "name" ) #name for commandline substitution conv_extensions = conv_elem.get( "type" ) #target datatype extension # FIXME: conv_extensions should be able to be an ordered list assert None not in [ name, type ], 'A name (%s) and type (%s) are required for explicit conversion' % ( name, type ) conv_types = tool.app.datatypes_registry.get_datatype_by_extension( conv_extensions.lower() ).__class__ self.conversions.append( ( name, conv_extensions, conv_types ) ) def get_html_field( self, trans=None, value=None, other_values={} ): filter_value = None if self.options: try: filter_value = self.options.get_options( trans, other_values )[0][0] except IndexError: pass #no valid options assert trans is not None, "DataToolParameter requires a trans" history = trans.get_history() assert history is not None, "DataToolParameter requires a history" if value is not None: if type( value ) != list: value = [ value ] field = form_builder.SelectField( self.name, self.multiple, None, self.refresh_on_change, refresh_on_change_values = self.refresh_on_change_values ) # CRUCIAL: the dataset_collector function needs to be local to DataToolParameter.get_html_field() def dataset_collector( hdas, parent_hid ): current_user_roles = trans.get_current_user_roles() for i, hda in enumerate( hdas ): if len( hda.name ) > 30: hda_name = '%s..%s' % ( hda.name[:17], hda.name[-11:] ) else: hda_name = hda.name if parent_hid is not None: hid = "%s.%d" % ( parent_hid, i + 1 ) else: hid = str( hda.hid ) if not hda.dataset.state in [galaxy.model.Dataset.states.ERROR, galaxy.model.Dataset.states.DISCARDED] and \ hda.visible and \ trans.app.security_agent.can_access_dataset( current_user_roles, hda.dataset ): # If we are sending data to an external application, then we need to make sure there are no roles # associated with the dataset that restrict it's access from "public". if self.tool and self.tool.tool_type == 'data_destination' and not trans.app.security_agent.dataset_is_public( hda.dataset ): continue if self.options and hda.get_dbkey() != filter_value: continue if isinstance( hda.datatype, self.formats): selected = ( value and ( hda in value ) ) field.add_option( "%s: %s" % ( hid, hda_name ), hda.id, selected ) else: target_ext, converted_dataset = hda.find_conversion_destination( self.formats, converter_safe = self.converter_safe( other_values, trans ) ) if target_ext: if converted_dataset: hda = converted_dataset if not trans.app.security_agent.can_access_dataset( current_user_roles, hda.dataset ): continue selected = ( value and ( hda in value ) ) field.add_option( "%s: (as %s) %s" % ( hid, target_ext, hda_name ), hda.id, selected ) # Also collect children via association object dataset_collector( hda.children, hid ) dataset_collector( history.active_datasets, None ) some_data = bool( field.options ) if some_data: if value is None or len( field.options ) == 1: # Ensure that the last item is always selected a, b, c = field.options[-1] if self.optional: field.options[-1] = a, b, False else: field.options[-1] = a, b, True if self.optional: if not value: field.add_option( "Selection is Optional", 'None', True ) else: field.add_option( "Selection is Optional", 'None', False ) return field def get_initial_value( self, trans, context ): """ NOTE: This is wasteful since dynamic options and dataset collection happens twice (here and when generating HTML). """ # Can't look at history in workflow mode if trans.workflow_building_mode: return DummyDataset() assert trans is not None, "DataToolParameter requires a trans" history = trans.get_history() assert history is not None, "DataToolParameter requires a history" if self.optional: return None most_recent_dataset = [None] filter_value = None if self.options: try: filter_value = self.options.get_options( trans, context )[0][0] except IndexError: pass #no valid options def dataset_collector( datasets ): def is_convertable( dataset ): target_ext, converted_dataset = dataset.find_conversion_destination( self.formats, converter_safe = self.converter_safe( None, trans ) ) if target_ext is not None: return True return False for i, data in enumerate( datasets ): if data.visible and not data.deleted and data.state not in [data.states.ERROR, data.states.DISCARDED] and ( isinstance( data.datatype, self.formats) or is_convertable( data ) ): if self.options and data.get_dbkey() != filter_value: continue most_recent_dataset[0] = data # Also collect children via association object dataset_collector( data.children ) dataset_collector( history.datasets ) most_recent_dataset = most_recent_dataset.pop() if most_recent_dataset is not None: return most_recent_dataset else: return '' def from_html( self, value, trans, other_values={} ): # Can't look at history in workflow mode, skip validation and such, # although, this should never be called in workflow mode right? if trans.workflow_building_mode: return None if not value: raise ValueError( "History does not include a dataset of the required format / build" ) if value in [None, "None"]: return None if isinstance( value, list ): return [ trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( v ) for v in value ] elif isinstance( value, trans.app.model.HistoryDatasetAssociation ): return value else: return trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( value ) def to_string( self, value, app ): if value is None or isinstance( value, str ): return value elif isinstance( value, DummyDataset ): return None return value.id def to_python( self, value, app ): # Both of these values indicate that no dataset is selected. However, 'None' # indicates that the dataset is optional, while '' indicates that it is not. if value is None or value == '' or value == 'None': return value return app.model.context.query( app.model.HistoryDatasetAssociation ).get( int( value ) ) def to_param_dict_string( self, value, other_values={} ): if value is None: return "None" return value.file_name def value_to_display_text( self, value, app ): if value: return "%s: %s" % ( value.hid, value.name ) else: return "No dataset" def get_dependencies( self ): """ Get the *names* of the other params this param depends on. """ if self.options: return self.options.get_dependency_names() else: return [] def converter_safe( self, other_values, trans ): if self.tool is None or self.tool.has_multiple_pages or not hasattr( trans, 'workflow_building_mode' ) or trans.workflow_building_mode: return False if other_values is None: return True # we don't know other values, so we can't check, assume ok converter_safe = [True] def visitor( prefix, input, value, parent = None ): if isinstance( input, SelectToolParameter ) and self.name in input.get_dependencies(): if input.is_dynamic and ( input.dynamic_options or ( not input.dynamic_options and not input.options ) or not input.options.converter_safe ): converter_safe[0] = False #This option does not allow for conversion, i.e. uses contents of dataset file to generate options self.tool.visit_inputs( other_values, visitor ) return False not in converter_safe # class RawToolParameter( ToolParameter ): # """ # Completely nondescript parameter, HTML representation is provided as text # contents. # # >>> p = RawToolParameter( None, XML( # ... ''' # ... # ... Some random stuff]]> # ... # ... ''' ) ) # >>> print p.name # blah # >>> print p.get_html().strip() # Some random stuff # """ # def __init__( self, tool, elem ): # ToolParameter.__init__( self, tool, elem ) # self.template = string.Template( elem.text ) # def get_html( self, prefix="" ): # context = dict( self.__dict__ ) # context.update( dict( prefix=prefix ) ) # return self.template.substitute( context ) # class HistoryIDParameter( ToolParameter ): # """ # Parameter that takes a name value, makes history.id available. # # FIXME: This is a hack (esp. if hidden params are a hack) but in order to # have the history accessable at the job level, it is necessary # I also probably wrote this docstring test thing wrong. # # >>> from galaxy.model import History # >>> from galaxy.util.bunch import Bunch # >>> hist = History( id=1 ) # >>> p = HistoryIDParameter( None, XML( '' ) ) # >>> print p.name # blah # >>> html_string = '' % hist.id # >>> assert p.get_html( trans=Bunch( history=hist ) ) == html_string # """ # def __init__( self, tool, elem ): # ToolParameter.__init__( self, tool, elem ) # def get_html( self, trans, value=None, other_values={} ): # assert trans.history is not None, "HistoryIDParameter requires a history" # self.html = form_builder.HiddenField( self.name, trans.history.id ).get_html() # return self.html parameter_types = dict( text = TextToolParameter, integer = IntegerToolParameter, float = FloatToolParameter, boolean = BooleanToolParameter, genomebuild = GenomeBuildParameter, select = SelectToolParameter, data_column = ColumnListParameter, hidden = HiddenToolParameter, baseurl = BaseURLToolParameter, file = FileToolParameter, data = DataToolParameter, drill_down = DrillDownSelectToolParameter ) class UnvalidatedValue( object ): """ Wrapper to mark a value that has not been validated """ def __init__( self, value ): self.value = value def __str__( self ): return str( self.value ) class RuntimeValue( object ): """ Wrapper to note a value that is not yet set, but will be required at runtime. """ pass