""" Provides mapping between extensions and datatypes, mime-types, etc. """ import os, tempfile import logging import data, tabular, interval, images, sequence, qualityscore, genetics, xml, coverage, tracks, chrominfo, binary, assembly, ngsindex import galaxy.util from galaxy.util.odict import odict from display_applications.application import DisplayApplication class ConfigurationError( Exception ): pass class Registry( object ): def __init__( self, root_dir=None, config=None ): self.log = logging.getLogger(__name__) self.log.addHandler( logging.NullHandler() ) self.datatypes_by_extension = {} self.mimetypes_by_extension = {} self.datatype_converters = odict() self.datatype_indexers = odict() self.converters = [] self.converter_deps = {} self.available_tracks = [] self.set_external_metadata_tool = None self.indexers = [] self.sniff_order = [] self.upload_file_formats = [] self.display_applications = odict() #map a display application id to a display application inherit_display_application_by_class = [] if root_dir and config: # Parse datatypes_conf.xml tree = galaxy.util.parse_xml( config ) root = tree.getroot() # Load datatypes and converters from config self.log.debug( 'Loading datatypes from %s' % config ) registration = root.find( 'registration' ) self.datatype_converters_path = os.path.join( root_dir, registration.get( 'converters_path', 'lib/galaxy/datatypes/converters' ) ) self.datatype_indexers_path = os.path.join( root_dir, registration.get( 'indexers_path', 'lib/galaxy/datatypes/indexers' ) ) self.display_applications_path = os.path.join( root_dir, registration.get( 'display_path', 'display_applications' ) ) if not os.path.isdir( self.datatype_converters_path ): raise ConfigurationError( "Directory does not exist: %s" % self.datatype_converters_path ) if not os.path.isdir( self.datatype_indexers_path ): raise ConfigurationError( "Directory does not exist: %s" % self.datatype_indexers_path ) for elem in registration.findall( 'datatype' ): try: extension = elem.get( 'extension', None ) dtype = elem.get( 'type', None ) mimetype = elem.get( 'mimetype', None ) display_in_upload = elem.get( 'display_in_upload', False ) if extension and dtype: fields = dtype.split( ':' ) datatype_module = fields[0] datatype_class = fields[1] fields = datatype_module.split( '.' ) module = __import__( fields.pop(0) ) for mod in fields: module = getattr( module, mod ) self.datatypes_by_extension[extension] = getattr( module, datatype_class )() if mimetype is None: # Use default mime type as per datatype spec mimetype = self.datatypes_by_extension[extension].get_mime() self.mimetypes_by_extension[extension] = mimetype if hasattr( getattr( module, datatype_class ), "get_track_type" ): self.available_tracks.append( extension ) if display_in_upload: self.upload_file_formats.append( extension ) #max file size cut off for setting optional metadata self.datatypes_by_extension[extension].max_optional_metadata_filesize = elem.get( 'max_optional_metadata_filesize', None ) for converter in elem.findall( 'converter' ): # Build the list of datatype converters which will later be loaded # into the calling app's toolbox. converter_config = converter.get( 'file', None ) target_datatype = converter.get( 'target_datatype', None ) depends_on = converter.get( 'depends_on', None ) if depends_on and target_datatype: if extension not in self.converter_deps: self.converter_deps[extension] = {} self.converter_deps[extension][target_datatype] = depends_on.split(',') if converter_config and target_datatype: self.converters.append( ( converter_config, extension, target_datatype ) ) for indexer in elem.findall( 'indexer' ): # Build the list of datatype indexers for track building indexer_config = indexer.get( 'file', None ) if indexer_config: self.indexers.append( (indexer_config, extension) ) for composite_file in elem.findall( 'composite_file' ): # add composite files name = composite_file.get( 'name', None ) if name is None: self.log.warning( "You must provide a name for your composite_file (%s)." % composite_file ) optional = composite_file.get( 'optional', False ) mimetype = composite_file.get( 'mimetype', None ) self.datatypes_by_extension[extension].add_composite_file( name, optional=optional, mimetype=mimetype ) for display_app in elem.findall( 'display' ): display_file = os.path.join( self.display_applications_path, display_app.get( 'file', None ) ) try: inherit = galaxy.util.string_as_bool( display_app.get( 'inherit', 'False' ) ) display_app = DisplayApplication.from_file( display_file, self ) if display_app: if display_app.id in self.display_applications: #if we already loaded this display application, we'll use the first one again display_app = self.display_applications[ display_app.id ] self.log.debug( "Loaded display application '%s' for datatype '%s', inherit=%s" % ( display_app.id, extension, inherit ) ) self.display_applications[ display_app.id ] = display_app #Display app by id self.datatypes_by_extension[ extension ].add_display_application( display_app ) if inherit and ( self.datatypes_by_extension[extension], display_app ) not in inherit_display_application_by_class: #subclass inheritance will need to wait until all datatypes have been loaded inherit_display_application_by_class.append( ( self.datatypes_by_extension[extension], display_app ) ) except: self.log.exception( "error reading display application from path: %s" % display_file ) except Exception, e: self.log.warning( 'Error loading datatype "%s", problem: %s' % ( extension, str( e ) ) ) # Handle display_application subclass inheritance here: for ext, d_type1 in self.datatypes_by_extension.iteritems(): for d_type2, display_app in inherit_display_application_by_class: current_app = d_type1.get_display_application( display_app.id, None ) if current_app is None and isinstance( d_type1, type( d_type2 ) ): d_type1.add_display_application( display_app ) # Load datatype sniffers from the config sniff_order = [] sniffers = root.find( 'sniffers' ) for elem in sniffers.findall( 'sniffer' ): dtype = elem.get( 'type', None ) if dtype: sniff_order.append( dtype ) for dtype in sniff_order: try: fields = dtype.split( ":" ) datatype_module = fields[0] datatype_class = fields[1] fields = datatype_module.split( "." ) module = __import__( fields.pop(0) ) for mod in fields: module = getattr( module, mod ) aclass = getattr( module, datatype_class )() included = False for atype in self.sniff_order: if not issubclass( atype.__class__, aclass.__class__ ) and isinstance( atype, aclass.__class__ ): included = True break if not included: self.sniff_order.append( aclass ) self.log.debug( 'Loaded sniffer for datatype: %s' % dtype ) except Exception, exc: self.log.warning( 'Error appending datatype %s to sniff_order, problem: %s' % ( dtype, str( exc ) ) ) #default values if len(self.datatypes_by_extension) < 1: self.datatypes_by_extension = { 'ab1' : binary.Ab1(), 'axt' : sequence.Axt(), 'bam' : binary.Bam(), 'bed' : interval.Bed(), 'blastxml' : xml.BlastXml(), 'coverage' : coverage.LastzCoverage(), 'customtrack' : interval.CustomTrack(), 'csfasta' : sequence.csFasta(), 'fasta' : sequence.Fasta(), 'fastq' : sequence.Fastq(), 'fastqsanger' : sequence.FastqSanger(), 'gtf' : interval.Gtf(), 'gff' : interval.Gff(), 'gff3' : interval.Gff3(), 'genetrack' : tracks.GeneTrack(), 'interval' : interval.Interval(), 'laj' : images.Laj(), 'lav' : sequence.Lav(), 'maf' : sequence.Maf(), 'pileup' : tabular.Pileup(), 'qualsolid' : qualityscore.QualityScoreSOLiD(), 'qualsolexa' : qualityscore.QualityScoreSolexa(), 'qual454' : qualityscore.QualityScore454(), 'sam' : tabular.Sam(), 'scf' : binary.Scf(), 'sff' : binary.Sff(), 'tabular' : tabular.Tabular(), 'taxonomy' : tabular.Taxonomy(), 'txt' : data.Text(), 'wig' : interval.Wiggle() } self.mimetypes_by_extension = { 'ab1' : 'application/octet-stream', 'axt' : 'text/plain', 'bam' : 'application/octet-stream', 'bed' : 'text/plain', 'blastxml' : 'text/plain', 'customtrack' : 'text/plain', 'csfasta' : 'text/plain', 'fasta' : 'text/plain', 'fastq' : 'text/plain', 'fastqsanger' : 'text/plain', 'gtf' : 'text/plain', 'gff' : 'text/plain', 'gff3' : 'text/plain', 'interval' : 'text/plain', 'laj' : 'text/plain', 'lav' : 'text/plain', 'maf' : 'text/plain', 'pileup' : 'text/plain', 'qualsolid' : 'text/plain', 'qualsolexa' : 'text/plain', 'qual454' : 'text/plain', 'sam' : 'text/plain', 'scf' : 'application/octet-stream', 'sff' : 'application/octet-stream', 'tabular' : 'text/plain', 'taxonomy' : 'text/plain', 'txt' : 'text/plain', 'wig' : 'text/plain' } # Default values - the order in which we attempt to determine data types is critical # because some formats are much more flexibly defined than others. if len(self.sniff_order) < 1: self.sniff_order = [ binary.Bam(), binary.Sff(), xml.BlastXml(), sequence.Maf(), sequence.Lav(), sequence.csFasta(), qualityscore.QualityScoreSOLiD(), qualityscore.QualityScore454(), sequence.Fasta(), sequence.Fastq(), interval.Wiggle(), images.Html(), sequence.Axt(), interval.Bed(), interval.CustomTrack(), interval.Gtf(), interval.Gff(), interval.Gff3(), tabular.Pileup(), interval.Interval(), tabular.Sam() ] def append_to_sniff_order(): # Just in case any supported data types are not included in the config's sniff_order section. for ext in self.datatypes_by_extension: datatype = self.datatypes_by_extension[ext] included = False for atype in self.sniff_order: if isinstance(atype, datatype.__class__): included = True break if not included: self.sniff_order.append(datatype) append_to_sniff_order() def get_available_tracks(self): return self.available_tracks def get_mimetype_by_extension(self, ext, default = 'application/octet-stream' ): """Returns a mimetype based on an extension""" try: mimetype = self.mimetypes_by_extension[ext] except KeyError: #datatype was never declared mimetype = default self.log.warning('unknown mimetype in data factory %s' % ext) return mimetype def get_datatype_by_extension(self, ext ): """Returns a datatype based on an extension""" try: builder = self.datatypes_by_extension[ext] except KeyError: builder = data.Text() return builder def change_datatype(self, data, ext, set_meta = True ): data.extension = ext # call init_meta and copy metadata from itself. The datatype # being converted *to* will handle any metadata copying and # initialization. if data.has_data(): data.set_size() data.init_meta( copy_from=data ) if set_meta: #metadata is being set internally data.set_meta( overwrite = False ) data.set_peek() return data def old_change_datatype(self, data, ext): """Creates and returns a new datatype based on an existing data and an extension""" newdata = factory(ext)(id=data.id) for key, value in data.__dict__.items(): setattr(newdata, key, value) newdata.ext = ext return newdata def load_datatype_converters( self, toolbox ): """Adds datatype converters from self.converters to the calling app's toolbox""" for elem in self.converters: tool_config = elem[0] source_datatype = elem[1] target_datatype = elem[2] converter_path = os.path.join( self.datatype_converters_path, tool_config ) try: converter = toolbox.load_tool( converter_path ) toolbox.tools_by_id[converter.id] = converter if source_datatype not in self.datatype_converters: self.datatype_converters[source_datatype] = odict() self.datatype_converters[source_datatype][target_datatype] = converter self.log.debug( "Loaded converter: %s", converter.id ) except: self.log.exception( "error reading converter from path: %s" % converter_path ) def load_external_metadata_tool( self, toolbox ): """Adds a tool which is used to set external metadata""" #we need to be able to add a job to the queue to set metadata. The queue will currently only accept jobs with an associated tool. #We'll create a special tool to be used for Auto-Detecting metadata; this is less than ideal, but effective #Properly building a tool without relying on parsing an XML file is near impossible...so we'll create a temporary file tool_xml_text = """ $__SET_EXTERNAL_METADATA_COMMAND_LINE__ """ tmp_name = tempfile.NamedTemporaryFile() tmp_name.write( tool_xml_text ) tmp_name.flush() set_meta_tool = toolbox.load_tool( tmp_name.name ) toolbox.tools_by_id[ set_meta_tool.id ] = set_meta_tool self.set_external_metadata_tool = set_meta_tool self.log.debug( "Loaded external metadata tool: %s", self.set_external_metadata_tool.id ) def load_datatype_indexers( self, toolbox ): """Adds indexers from self.indexers to the toolbox from app""" for elem in self.indexers: tool_config = elem[0] datatype = elem[1] indexer = toolbox.load_tool( os.path.join( self.datatype_indexers_path, tool_config ) ) toolbox.tools_by_id[indexer.id] = indexer self.datatype_indexers[datatype] = indexer self.log.debug( "Loaded indexer: %s", indexer.id ) def get_converters_by_datatype(self, ext): """Returns available converters by source type""" converters = odict() source_datatype = type(self.get_datatype_by_extension(ext)) for ext2, dict in self.datatype_converters.items(): converter_datatype = type(self.get_datatype_by_extension(ext2)) if issubclass(source_datatype, converter_datatype): converters.update(dict) #Ensure ext-level converters are present if ext in self.datatype_converters.keys(): converters.update(self.datatype_converters[ext]) return converters def get_indexers_by_datatype( self, ext ): """Returns indexers based on datatype""" class_chain = list() source_datatype = type(self.get_datatype_by_extension(ext)) for ext_spec in self.datatype_indexers.keys(): datatype = type(self.get_datatype_by_extension(ext_spec)) if issubclass( source_datatype, datatype ): class_chain.append( ext_spec ) # Prioritize based on class chain ext2type = lambda x: self.get_datatype_by_extension(x) class_chain = sorted(class_chain, lambda x,y: issubclass(ext2type(x),ext2type(y)) and -1 or 1) return [self.datatype_indexers[x] for x in class_chain] def get_converter_by_target_type(self, source_ext, target_ext): """Returns a converter based on source and target datatypes""" converters = self.get_converters_by_datatype(source_ext) if target_ext in converters.keys(): return converters[target_ext] return None def find_conversion_destination_for_dataset_by_extensions( self, dataset, accepted_formats, converter_safe = True ): """Returns ( target_ext, existing converted dataset )""" for convert_ext in self.get_converters_by_datatype( dataset.ext ): if isinstance( self.get_datatype_by_extension( convert_ext ), accepted_formats ): dataset = dataset.get_converted_files_by_type( convert_ext ) if dataset: ret_data = dataset elif not converter_safe: continue else: ret_data = None return ( convert_ext, ret_data ) return ( None, None ) def get_composite_extensions( self ): return [ ext for ( ext, d_type ) in self.datatypes_by_extension.iteritems() if d_type.composite_type is not None ] def get_upload_metadata_params( self, context, group, tool ): """Returns dict of case value:inputs for metadata conditional for upload tool""" rval = {} for ext, d_type in self.datatypes_by_extension.iteritems(): inputs = [] for meta_name, meta_spec in d_type.metadata_spec.iteritems(): if meta_spec.set_in_upload: help_txt = meta_spec.desc if not help_txt or help_txt == meta_name: help_txt = "" inputs.append( '' % ( meta_name, meta_name, meta_spec.default, help_txt ) ) rval[ ext ] = "\n".join( inputs ) if 'auto' not in rval and 'txt' in rval: #need to manually add 'auto' datatype rval[ 'auto' ] = rval[ 'txt' ] return rval