""" Galaxy Tool Shed data model classes Naming: try to use class names that have a distinct plural form so that the relationship cardinalities are obvious (e.g. prefer Dataset to Data) """ import os.path, os, errno, sys, codecs, operator, tempfile, logging, tarfile, mimetypes from galaxy.util.bunch import Bunch from galaxy.util.hash_util import * from galaxy.web.form_builder import * log = logging.getLogger( __name__ ) class User( object ): def __init__( self, email=None, password=None ): self.email = email self.password = password self.external = False self.deleted = False self.purged = False self.username = None # Relationships self.tools = [] def set_password_cleartext( self, cleartext ): """Set 'self.password' to the digest of 'cleartext'.""" self.password = new_secure_hash( text_type=cleartext ) def check_password( self, cleartext ): """Check if 'cleartext' matches 'self.password' when hashed.""" return self.password == new_secure_hash( text_type=cleartext ) class Group( object ): def __init__( self, name = None ): self.name = name self.deleted = False class Role( object ): private_id = None types = Bunch( PRIVATE = 'private', SYSTEM = 'system', USER = 'user', ADMIN = 'admin', SHARING = 'sharing' ) def __init__( self, name="", description="", type="system", deleted=False ): self.name = name self.description = description self.type = type self.deleted = deleted class UserGroupAssociation( object ): def __init__( self, user, group ): self.user = user self.group = group class UserRoleAssociation( object ): def __init__( self, user, role ): self.user = user self.role = role class GroupRoleAssociation( object ): def __init__( self, group, role ): self.group = group self.role = role class GalaxySession( object ): def __init__( self, id=None, user=None, remote_host=None, remote_addr=None, referer=None, current_history=None, session_key=None, is_valid=False, prev_session_id=None ): self.id = id self.user = user self.remote_host = remote_host self.remote_addr = remote_addr self.referer = referer self.current_history = current_history self.session_key = session_key self.is_valid = is_valid self.prev_session_id = prev_session_id class Tool( object ): file_path = '/tmp' states = Bunch( NEW = 'new', ERROR = 'error', DELETED = 'deleted', WAITING = 'waiting', APPROVED = 'approved', REJECTED = 'rejected', ARCHIVED = 'archived' ) def __init__( self, guid=None, tool_id=None, name=None, description=None, user_description=None, category=None, version=None, user_id=None, external_filename=None, suite=False ): self.guid = guid self.tool_id = tool_id self.name = name or "Unnamed tool" self.description = description self.user_description = user_description self.version = version or "1.0.0" self.user_id = user_id self.external_filename = external_filename self.deleted = False self.__extension = None self.suite = suite def get_file_name( self ): if not self.external_filename: assert self.id is not None, "ID must be set before filename used (commit the object)" dir = os.path.join( self.file_path, 'tools', *directory_hash_id( self.id ) ) # Create directory if it does not exist if not os.path.exists( dir ): os.makedirs( dir ) # Return filename inside hashed directory filename = os.path.join( dir, "tool_%d.dat" % self.id ) else: filename = self.external_filename # Make filename absolute return os.path.abspath( filename ) def set_file_name( self, filename ): if not filename: self.external_filename = None else: self.external_filename = filename file_name = property( get_file_name, set_file_name ) def create_from_datatype( self, datatype_bunch ): # TODO: ensure guid is unique and generate a new one if not. self.guid = datatype_bunch.guid self.tool_id = datatype_bunch.id self.name = datatype_bunch.name self.description = datatype_bunch.description self.version = datatype_bunch.version self.user_id = datatype_bunch.user.id self.suite = datatype_bunch.suite @property def state( self ): latest_event = self.latest_event if latest_event: return latest_event.state return None @property def latest_event( self ): if self.events: events = [ tea.event for tea in self.events ] # Get the last event that occurred ( events mapper is sorted descending ) return events[0] return None # Tool states @property def is_new( self ): return self.state == self.states.NEW @property def is_error( self ): return self.state == self.states.ERROR @property def is_deleted( self ): return self.state == self.states.DELETED @property def is_waiting( self ): return self.state == self.states.WAITING @property def is_approved( self ): return self.state == self.states.APPROVED @property def is_rejected( self ): return self.state == self.states.REJECTED @property def is_archived( self ): return self.state == self.states.ARCHIVED def get_state_message( self ): if self.is_suite: label = 'tool suite' else: label = 'tool' if self.is_new: return 'This is an unsubmitted version of this %s' % label if self.is_error: return 'This %s is in an error state' % label if self.is_deleted: return 'This is a deleted version of this %s' % label if self.is_waiting: return 'This version of this %s is awaiting administrative approval' % label if self.is_approved: return 'This is the latest approved version of this %s' % label if self.is_rejected: return 'This version of this %s has been rejected by an administrator' % label if self.is_archived: return 'This is an archived version of this %s' % label @property def extension( self ): # if instantiated via a query, this unmapped property won't exist if '_Tool__extension' not in dir( self ): self.__extension = None if self.__extension is None: head = open( self.file_name, 'rb' ).read( 4 ) try: assert head[:3] == 'BZh' assert int( head[-1] ) in range( 0, 10 ) self.__extension = 'tar.bz2' except AssertionError: pass if self.__extension is None: try: assert head[:2] == '\037\213' self.__extension = 'tar.gz' except: pass if self.__extension is None: self.__extension = 'tar' return self.__extension @property def is_suite( self ): return self.suite @property def label( self ): if self.is_suite: return 'tool suite' else: return 'tool' @property def download_file_name( self ): return '%s_%s.%s' % ( self.tool_id, self.version, self.extension ) @property def mimetype( self ): return mimetypes.guess_type( self.download_file_name )[0] class Event( object ): def __init__( self, state=None, comment='' ): self.state = state self.comment = comment class ToolEventAssociation( object ): def __init__( self, tool=None, event=None ): self.tool = tool self.event = event class ItemRatingAssociation( object ): def __init__( self, id=None, user=None, item=None, rating=0, comment='' ): self.id = id self.user = user self.item = item self.rating = rating self.comment = comment def set_item( self, item ): """ Set association's item. """ pass class ToolRatingAssociation( ItemRatingAssociation ): def set_item( self, tool ): self.tool = tool class Category( object ): def __init__( self, name=None, description=None, deleted=False ): self.name = name self.description = description self.deleted = deleted class ToolCategoryAssociation( object ): def __init__( self, tool=None, category=None ): self.tool = tool self.category = category class Tag ( object ): def __init__( self, id=None, type=None, parent_id=None, name=None ): self.id = id self.type = type self.parent_id = parent_id self.name = name def __str__ ( self ): return "Tag(id=%s, type=%i, parent_id=%s, name=%s)" % ( self.id, self.type, self.parent_id, self.name ) class ItemTagAssociation ( object ): def __init__( self, id=None, user=None, item_id=None, tag_id=None, user_tname=None, value=None ): self.id = id self.user = user self.item_id = item_id self.tag_id = tag_id self.user_tname = user_tname self.value = None self.user_value = None class ToolTagAssociation ( ItemTagAssociation ): pass class ToolAnnotationAssociation( object ): pass ## ---- Utility methods ------------------------------------------------------- def sort_by_attr( seq, attr ): """ Sort the sequence of objects by object's attribute Arguments: seq - the list or any sequence (including immutable one) of objects to sort. attr - the name of attribute to sort by """ # Use the "Schwartzian transform" # Create the auxiliary list of tuples where every i-th tuple has form # (seq[i].attr, i, seq[i]) and sort it. The second item of tuple is needed not # only to provide stable sorting, but mainly to eliminate comparison of objects # (which can be expensive or prohibited) in case of equal attribute values. intermed = map( None, map( getattr, seq, ( attr, ) * len( seq ) ), xrange( len( seq ) ), seq ) intermed.sort() return map( operator.getitem, intermed, ( -1, ) * len( intermed ) ) def directory_hash_id( id ): s = str( id ) l = len( s ) # Shortcut -- ids 0-999 go under ../000/ if l < 4: return [ "000" ] # Pad with zeros until a multiple of three padded = ( ( ( 3 - len( s ) ) % 3 ) * "0" ) + s # Drop the last three digits -- 1000 files per directory padded = padded[:-3] # Break into chunks of three return [ padded[i*3:(i+1)*3] for i in range( len( padded ) // 3 ) ]