import logging, os, string, shutil, re, socket, mimetypes, smtplib, urllib, tempfile, zipfile, glob, sys
from galaxy.web.base.controller import *
from galaxy.web.framework.helpers import time_ago, iff, grids
from galaxy import util, datatypes, jobs, web, model
from cgi import escape, FieldStorage
from galaxy.datatypes.display_applications.util import encode_dataset_user, decode_dataset_user
from galaxy.util.sanitize_html import sanitize_html
from galaxy.model.item_attrs import *
from email.MIMEText import MIMEText
import pkg_resources;
pkg_resources.require( "Paste" )
import paste.httpexceptions
if sys.version_info[:2] < ( 2, 6 ):
zipfile.BadZipFile = zipfile.error
if sys.version_info[:2] < ( 2, 5 ):
zipfile.LargeZipFile = zipfile.error
tmpd = tempfile.mkdtemp()
comptypes=[]
ziptype = '32'
tmpf = os.path.join( tmpd, 'compression_test.zip' )
try:
archive = zipfile.ZipFile( tmpf, 'w', zipfile.ZIP_DEFLATED, True )
archive.close()
comptypes.append( 'zip' )
ziptype = '64'
except RuntimeError:
log.exception( "Compression error when testing zip compression. This option will be disabled for library downloads." )
except (TypeError, zipfile.LargeZipFile): # ZIP64 is only in Python2.5+. Remove TypeError when 2.4 support is dropped
log.warning( 'Max zip file size is 2GB, ZIP64 not supported' )
comptypes.append( 'zip' )
try:
os.unlink( tmpf )
except OSError:
pass
os.rmdir( tmpd )
log = logging.getLogger( __name__ )
error_report_template = """
GALAXY TOOL ERROR REPORT
------------------------
This error report was sent from the Galaxy instance hosted on the server
"${host}"
-----------------------------------------------------------------------------
This is in reference to dataset id ${dataset_id} from history id ${history_id}
-----------------------------------------------------------------------------
You should be able to view the history containing the related history item
${hid}: ${history_item_name}
by logging in as a Galaxy admin user to the Galaxy instance referenced above
and pointing your browser to the following link.
${history_view_link}
-----------------------------------------------------------------------------
The user '${email}' provided the following information:
${message}
-----------------------------------------------------------------------------
job id: ${job_id}
tool id: ${job_tool_id}
-----------------------------------------------------------------------------
job command line:
${job_command_line}
-----------------------------------------------------------------------------
job stderr:
${job_stderr}
-----------------------------------------------------------------------------
job stdout:
${job_stdout}
-----------------------------------------------------------------------------
job info:
${job_info}
-----------------------------------------------------------------------------
job traceback:
${job_traceback}
-----------------------------------------------------------------------------
(This is an automated message).
"""
class HistoryDatasetAssociationListGrid( grids.Grid ):
# Custom columns for grid.
class HistoryColumn( grids.GridColumn ):
def get_value( self, trans, grid, hda):
return hda.history.name
class StatusColumn( grids.GridColumn ):
def get_value( self, trans, grid, hda ):
if hda.deleted:
return "deleted"
return ""
def get_accepted_filters( self ):
""" Returns a list of accepted filters for this column. """
accepted_filter_labels_and_vals = { "Active" : "False", "Deleted" : "True", "All": "All" }
accepted_filters = []
for label, val in accepted_filter_labels_and_vals.items():
args = { self.key: val }
accepted_filters.append( grids.GridColumnFilter( label, args) )
return accepted_filters
# Grid definition
title = "Saved Datasets"
model_class = model.HistoryDatasetAssociation
template='/dataset/grid.mako'
default_sort_key = "-update_time"
columns = [
grids.TextColumn( "Name", key="name",
# Link name to dataset's history.
link=( lambda item: iff( item.history.deleted, None, dict( operation="switch", id=item.id ) ) ), filterable="advanced", attach_popup=True ),
HistoryColumn( "History", key="history",
link=( lambda item: iff( item.history.deleted, None, dict( operation="switch_history", id=item.id ) ) ) ),
grids.IndividualTagsColumn( "Tags", key="tags", model_tag_association_class=model.HistoryDatasetAssociationTagAssociation, filterable="advanced", grid_name="HistoryDatasetAssocationListGrid" ),
StatusColumn( "Status", key="deleted", attach_popup=False ),
grids.GridColumn( "Last Updated", key="update_time", format=time_ago ),
]
columns.append(
grids.MulticolFilterColumn(
"Search",
cols_to_filter=[ columns[0], columns[2] ],
key="free-text-search", visible=False, filterable="standard" )
)
operations = [
grids.GridOperation( "Copy to current history", condition=( lambda item: not item.deleted ), async_compatible=False ),
]
standard_filters = []
default_filter = dict( name="All", deleted="False", tags="All" )
preserve_state = False
use_paging = True
num_rows_per_page = 50
def build_initial_query( self, trans, **kwargs ):
# Show user's datasets that are not deleted, not in deleted histories, and not hidden.
# To filter HDAs by user, need to join model class/HDA and History table so that it is
# possible to filter by user. However, for dictionary-based filtering to work, need a
# primary table for the query.
return trans.sa_session.query( self.model_class ).select_from( self.model_class.table.join( model.History.table ) ) \
.filter( model.History.user == trans.user ) \
.filter( self.model_class.deleted==False ) \
.filter( model.History.deleted==False ) \
.filter( self.model_class.visible==True )
class DatasetInterface( BaseController, UsesAnnotations, UsesHistoryDatasetAssociation, UsesItemRatings ):
stored_list_grid = HistoryDatasetAssociationListGrid()
@web.expose
def errors( self, trans, id ):
hda = trans.sa_session.query( model.HistoryDatasetAssociation ).get( id )
return trans.fill_template( "dataset/errors.mako", hda=hda )
@web.expose
def stderr( self, trans, id ):
dataset = trans.sa_session.query( model.HistoryDatasetAssociation ).get( id )
job = dataset.creating_job_associations[0].job
trans.response.set_content_type( 'text/plain' )
return job.stderr
@web.expose
def report_error( self, trans, id, email='', message="" ):
smtp_server = trans.app.config.smtp_server
if smtp_server is None:
return trans.show_error_message( "Mail is not configured for this galaxy instance" )
to_address = trans.app.config.error_email_to
if to_address is None:
return trans.show_error_message( "Error reporting has been disabled for this galaxy instance" )
# Get the dataset and associated job
hda = trans.sa_session.query( model.HistoryDatasetAssociation ).get( id )
job = hda.creating_job_associations[0].job
# Get the name of the server hosting the Galaxy instance from which this report originated
host = trans.request.host
history_view_link = "%s/history/view?id=%s" % ( str( host ), trans.security.encode_id( hda.history_id ) )
# Build the email message
msg = MIMEText( string.Template( error_report_template )
.safe_substitute( host=host,
dataset_id=hda.dataset_id,
history_id=hda.history_id,
hid=hda.hid,
history_item_name=hda.get_display_name(),
history_view_link=history_view_link,
job_id=job.id,
job_tool_id=job.tool_id,
job_command_line=job.command_line,
job_stderr=job.stderr,
job_stdout=job.stdout,
job_info=job.info,
job_traceback=job.traceback,
email=email,
message=message ) )
frm = to_address
# Check email a bit
email = email.strip()
parts = email.split()
if len( parts ) == 1 and len( email ) > 0:
to = to_address + ", " + email
else:
to = to_address
msg[ 'To' ] = to
msg[ 'From' ] = frm
msg[ 'Subject' ] = "Galaxy tool error report from " + email
# Send it
try:
s = smtplib.SMTP()
s.connect( smtp_server )
s.sendmail( frm, [ to ], msg.as_string() )
s.close()
return trans.show_ok_message( "Your error report has been sent" )
except Exception, e:
return trans.show_error_message( "An error occurred sending the report by email: %s" % str( e ) )
@web.expose
def default(self, trans, dataset_id=None, **kwd):
return 'This link may not be followed from within Galaxy.'
@web.expose
def archive_composite_dataset( self, trans, data=None, **kwd ):
# save a composite object into a compressed archive for downloading
params = util.Params( kwd )
valid_chars = '.,^_-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
outfname = data.name[0:150]
outfname = ''.join(c in valid_chars and c or '_' for c in outfname)
if (params.do_action == None):
params.do_action = 'zip' # default
msg = util.restore_text( params.get( 'msg', '' ) )
messagetype = params.get( 'messagetype', 'done' )
if not data:
msg = "You must select at least one dataset"
messagetype = 'error'
else:
error = False
try:
if (params.do_action == 'zip'):
# Can't use mkstemp - the file must not exist first
tmpd = tempfile.mkdtemp()
tmpf = os.path.join( tmpd, 'library_download.' + params.do_action )
if ziptype == '64':
archive = zipfile.ZipFile( tmpf, 'w', zipfile.ZIP_DEFLATED, True )
else:
archive = zipfile.ZipFile( tmpf, 'w', zipfile.ZIP_DEFLATED )
archive.add = lambda x, y: archive.write( x, y.encode('CP437') )
elif params.do_action == 'tgz':
archive = util.streamball.StreamBall( 'w|gz' )
elif params.do_action == 'tbz':
archive = util.streamball.StreamBall( 'w|bz2' )
except (OSError, zipfile.BadZipFile):
error = True
log.exception( "Unable to create archive for download" )
msg = "Unable to create archive for %s for download, please report this error" % outfname
messagetype = 'error'
if not error:
current_user_roles = trans.get_current_user_roles()
ext = data.extension
path = data.file_name
fname = os.path.split(path)[-1]
efp = data.extra_files_path
htmlname = os.path.splitext(outfname)[0]
if not htmlname.endswith(ext):
htmlname = '%s_%s' % (htmlname,ext)
archname = '%s.html' % htmlname # fake the real nature of the html file
try:
archive.add(data.file_name,archname)
except IOError:
error = True
log.exception( "Unable to add composite parent %s to temporary library download archive" % data.file_name)
msg = "Unable to create archive for download, please report this error"
messagetype = 'error'
flist = glob.glob(os.path.join(efp,'*.*')) # glob returns full paths
for fpath in flist:
efp,fname = os.path.split(fpath)
try:
archive.add( fpath,fname )
except IOError:
error = True
log.exception( "Unable to add %s to temporary library download archive" % fname)
msg = "Unable to create archive for download, please report this error"
messagetype = 'error'
continue
if not error:
if params.do_action == 'zip':
archive.close()
tmpfh = open( tmpf )
# clean up now
try:
os.unlink( tmpf )
os.rmdir( tmpd )
except OSError:
error = True
msg = "Unable to remove temporary library download archive and directory"
log.exception( msg )
messagetype = 'error'
if not error:
trans.response.set_content_type( "application/x-zip-compressed" )
trans.response.headers[ "Content-Disposition" ] = "attachment; filename=%s.zip" % outfname
return tmpfh
else:
trans.response.set_content_type( "application/x-tar" )
outext = 'tgz'
if params.do_action == 'tbz':
outext = 'tbz'
trans.response.headers[ "Content-Disposition" ] = "attachment; filename=%s.%s" % (outfname,outext)
archive.wsgi_status = trans.response.wsgi_status()
archive.wsgi_headeritems = trans.response.wsgi_headeritems()
return archive.stream
return trans.show_error_message( msg )
@web.expose
def display(self, trans, dataset_id=None, preview=False, filename=None, to_ext=None, **kwd):
"""Catches the dataset id and displays file contents as directed"""
composite_extensions = trans.app.datatypes_registry.get_composite_extensions( )
composite_extensions.append('html') # for archiving composite datatypes
# DEPRECATION: We still support unencoded ids for backward compatibility
try:
data = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( trans.security.decode_id( dataset_id ) )
if data is None:
raise ValueError( 'Invalid reference dataset id: %s.' % dataset_id )
except:
try:
data = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( int( dataset_id ) )
except:
data = None
if not data:
raise paste.httpexceptions.HTTPRequestRangeNotSatisfiable( "Invalid reference dataset id: %s." % str( dataset_id ) )
current_user_roles = trans.get_current_user_roles()
if not trans.app.security_agent.can_access_dataset( current_user_roles, data.dataset ):
return trans.show_error_message( "You are not allowed to access this dataset" )
if data.state == trans.model.Dataset.states.UPLOAD:
return trans.show_error_message( "Please wait until this dataset finishes uploading before attempting to view it." )
if filename and filename != "index":
# For files in extra_files_path
file_path = os.path.join( data.extra_files_path, filename )
if os.path.exists( file_path ):
mime, encoding = mimetypes.guess_type( file_path )
if not mime:
try:
mime = trans.app.datatypes_registry.get_mimetype_by_extension( ".".split( file_path )[-1] )
except:
mime = "text/plain"
trans.response.set_content_type( mime )
return open( file_path )
else:
return "Could not find '%s' on the extra files path %s." % (filename,file_path)
mime = trans.app.datatypes_registry.get_mimetype_by_extension( data.extension.lower() )
trans.response.set_content_type(mime)
trans.log_event( "Display dataset id: %s" % str( dataset_id ) )
if to_ext or isinstance(data.datatype, datatypes.binary.Binary): # Saving the file, or binary file
if data.extension in composite_extensions:
return self.archive_composite_dataset( trans, data, **kwd )
else:
trans.response.headers['Content-Length'] = int( os.stat( data.file_name ).st_size )
if not to_ext:
to_ext = data.extension
valid_chars = '.,^_-()[]0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
fname = data.name
fname = ''.join(c in valid_chars and c or '_' for c in fname)[0:150]
trans.response.headers["Content-Disposition"] = "attachment; filename=Galaxy%s-[%s].%s" % (data.hid, fname, to_ext)
return open( data.file_name )
if not os.path.exists( data.file_name ):
raise paste.httpexceptions.HTTPNotFound( "File Not Found (%s)." % data.file_name )
max_peek_size = 1000000 # 1 MB
if not preview or isinstance(data.datatype, datatypes.images.Image) or os.stat( data.file_name ).st_size < max_peek_size:
return open( data.file_name )
else:
trans.response.set_content_type( "text/html" )
return trans.stream_template_mako( "/dataset/large_file.mako",
truncated_data = open( data.file_name ).read(max_peek_size),
data = data )
@web.expose
@web.require_login( "see all available datasets" )
def list( self, trans, **kwargs ):
"""List all available datasets"""
status = message = None
if 'operation' in kwargs:
operation = kwargs['operation'].lower()
hda_ids = util.listify( kwargs.get( 'id', [] ) )
# Display no message by default
status, message = None, None
# Load the hdas and ensure they all belong to the current user
hdas = []
for encoded_hda_id in hda_ids:
hda_id = trans.security.decode_id( encoded_hda_id )
hda = trans.sa_session.query( model.HistoryDatasetAssociation ).filter_by( id=hda_id ).first()
if hda:
# Ensure history is owned by current user
if hda.history.user_id != None and trans.user:
assert trans.user.id == hda.history.user_id, "HistoryDatasetAssocation does not belong to current user"
hdas.append( hda )
else:
log.warn( "Invalid history_dataset_association id '%r' passed to list", hda_id )
if hdas:
if operation == "switch" or operation == "switch_history":
# Switch to a history that the HDA resides in.
# Convert hda to histories.
histories = []
for hda in hdas:
histories.append( hda.history )
# Use history controller to switch the history. TODO: is this reasonable?
status, message = trans.webapp.controllers['history']._list_switch( trans, histories )
# Current history changed, refresh history frame; if switching to a dataset, set hda seek.
trans.template_context['refresh_frames'] = ['history']
if operation == "switch":
hda_ids = [ trans.security.encode_id( hda.id ) for hda in hdas ]
trans.template_context[ 'seek_hda_ids' ] = hda_ids
elif operation == "copy to current history":
# Copy a dataset to the current history.
target_histories = [ trans.get_history() ]
status, message = self._copy_datasets( trans, hda_ids, target_histories )
# Current history changed, refresh history frame.
trans.template_context['refresh_frames'] = ['history']
# Render the list view
return self.stored_list_grid( trans, status=status, message=message, **kwargs )
@web.expose
def imp( self, trans, dataset_id=None, **kwd ):
""" Import another user's dataset via a shared URL; dataset is added to user's current history. """
msg = ""
# Set referer message.
referer = trans.request.referer
if referer is not "":
referer_message = "return to the previous page" % referer
else:
referer_message = "go to Galaxy's start page" % url_for( '/' )
# Error checking.
if not dataset_id:
return trans.show_error_message( "You must specify a dataset to import. You can %s." % referer_message, use_panels=True )
# Do import.
cur_history = trans.get_history( create=True )
status, message = self._copy_datasets( trans, [ dataset_id ], [ cur_history ], imported=True )
message = "Dataset imported.
You can start using the dataset or %s." % ( url_for('/'), referer_message )
return trans.show_message( message, type=status, use_panels=True )
@web.expose
@web.json
@web.require_login( "use Galaxy datasets" )
def get_name_and_link_async( self, trans, id=None ):
""" Returns dataset's name and link. """
dataset = self.get_dataset( trans, id, False, True )
return_dict = { "name" : dataset.name, "link" : url_for( action="display_by_username_and_slug", username=dataset.history.user.username, slug=trans.security.encode_id( dataset.id ) ) }
return return_dict
@web.expose
def get_embed_html_async( self, trans, id ):
""" Returns HTML for embedding a dataset in a page. """
dataset = self.get_dataset( trans, id, False, True )
if dataset:
return "Embedded Dataset '%s'" % dataset.name
@web.expose
@web.require_login( "use Galaxy datasets" )
def set_accessible_async( self, trans, id=None, accessible=False ):
""" Does nothing because datasets do not have an importable/accessible attribute. This method could potentially set another attribute. """
return
@web.expose
@web.require_login( "rate items" )
@web.json
def rate_async( self, trans, id, rating ):
""" Rate a dataset asynchronously and return updated community data. """
dataset = self.get_dataset( trans, id, check_ownership=False, check_accessible=True )
if not dataset:
return trans.show_error_message( "The specified dataset does not exist." )
# Rate dataset.
dataset_rating = self.rate_item( rate_item, trans.get_user(), dataset, rating )
return self.get_ave_item_rating_data( trans.sa_session, dataset )
@web.expose
def display_by_username_and_slug( self, trans, username, slug, preview=True ):
""" Display dataset by username and slug; because datasets do not yet have slugs, the slug is the dataset's id. """
dataset = self.get_dataset( trans, slug, False, True )
if dataset:
truncated, dataset_data = self.get_data( dataset, preview )
dataset.annotation = self.get_item_annotation_str( trans.sa_session, dataset.history.user, dataset )
# If data is binary or an image, stream without template; otherwise, use display template.
# TODO: figure out a way to display images in display template.
if isinstance(dataset.datatype, datatypes.binary.Binary) or isinstance(dataset.datatype, datatypes.images.Image):
mime = trans.app.datatypes_registry.get_mimetype_by_extension( dataset.extension.lower() )
trans.response.set_content_type( mime )
return open( dataset.file_name )
else:
# Get rating data.
user_item_rating = 0
if trans.get_user():
user_item_rating = self.get_user_item_rating( trans.sa_session, trans.get_user(), dataset )
if user_item_rating:
user_item_rating = user_item_rating.rating
else:
user_item_rating = 0
ave_item_rating, num_ratings = self.get_ave_item_rating_data( trans.sa_session, dataset )
return trans.fill_template_mako( "/dataset/display.mako", item=dataset, item_data=dataset_data, truncated=truncated,
user_item_rating = user_item_rating, ave_item_rating=ave_item_rating, num_ratings=num_ratings )
else:
raise web.httpexceptions.HTTPNotFound()
@web.expose
def get_item_content_async( self, trans, id ):
""" Returns item content in HTML format. """
dataset = self.get_dataset( trans, id, False, True )
if dataset is None:
raise web.httpexceptions.HTTPNotFound()
truncated, dataset_data = self.get_data( dataset, preview=True )
# Get annotation.
dataset.annotation = self.get_item_annotation_str( trans.sa_session, trans.user, dataset )
return trans.stream_template_mako( "/dataset/item_content.mako", item=dataset, item_data=dataset_data, truncated=truncated )
@web.expose
def annotate_async( self, trans, id, new_annotation=None, **kwargs ):
dataset = self.get_dataset( trans, id, False, True )
if not dataset:
web.httpexceptions.HTTPNotFound()
if dataset and new_annotation:
# Sanitize annotation before adding it.
new_annotation = sanitize_html( new_annotation, 'utf-8', 'text/html' )
self.add_item_annotation( trans.sa_session, trans.get_user(), dataset, new_annotation )
trans.sa_session.flush()
return new_annotation
@web.expose
def get_annotation_async( self, trans, id ):
dataset = self.get_dataset( trans, id, False, True )
if not dataset:
web.httpexceptions.HTTPNotFound()
return self.get_item_annotation_str( trans.sa_session, trans.user, dataset )
@web.expose
def display_at( self, trans, dataset_id, filename=None, **kwd ):
"""Sets up a dataset permissions so it is viewable at an external site"""
site = filename
data = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( dataset_id )
if not data:
raise paste.httpexceptions.HTTPRequestRangeNotSatisfiable( "Invalid reference dataset id: %s." % str( dataset_id ) )
if 'display_url' not in kwd or 'redirect_url' not in kwd:
return trans.show_error_message( 'Invalid parameters specified for "display at" link, please contact a Galaxy administrator' )
try:
redirect_url = kwd['redirect_url'] % urllib.quote_plus( kwd['display_url'] )
except:
redirect_url = kwd['redirect_url'] # not all will need custom text
current_user_roles = trans.get_current_user_roles()
if trans.app.security_agent.dataset_is_public( data.dataset ):
return trans.response.send_redirect( redirect_url ) # anon access already permitted by rbac
if trans.app.security_agent.can_access_dataset( current_user_roles, data.dataset ):
trans.app.host_security_agent.set_dataset_permissions( data, trans.user, site )
return trans.response.send_redirect( redirect_url )
else:
return trans.show_error_message( "You are not allowed to view this dataset at external sites. Please contact your Galaxy administrator to acquire management permissions for this dataset." )
@web.expose
def display_application( self, trans, dataset_id=None, user_id=None, app_name = None, link_name = None, app_action = None, action_param = None, **kwds ):
"""Access to external display applications"""
if kwds:
log.debug( "Unexpected Keywords passed to display_application: %s" % kwds ) #route memory?
#decode ids
data, user = decode_dataset_user( trans, dataset_id, user_id )
if not data:
raise paste.httpexceptions.HTTPRequestRangeNotSatisfiable( "Invalid reference dataset id: %s." % str( dataset_id ) )
if user is None:
user = trans.user
if user:
user_roles = user.all_roles()
else:
user_roles = []
if None in [ app_name, link_name ]:
return trans.show_error_message( "A display application name and link name must be provided." )
if trans.app.security_agent.can_access_dataset( user_roles, data.dataset ):
msg = []
refresh = False
display_app = trans.app.datatypes_registry.display_applications.get( app_name )
assert display_app, "Unknown display application has been requested: %s" % app_name
dataset_hash, user_hash = encode_dataset_user( trans, data, user )
display_link = display_app.get_link( link_name, data, dataset_hash, user_hash, trans )
assert display_link, "Unknown display link has been requested: %s" % link_name
if data.state == data.states.ERROR:
msg.append( ( 'This dataset is in an error state, you cannot view it at an external display application.', 'error' ) )
elif data.deleted:
msg.append( ( 'This dataset has been deleted, you cannot view it at an external display application.', 'error' ) )
elif data.state != data.states.OK:
msg.append( ( 'You must wait for this dataset to be created before you can view it at an external display application.', 'info' ) )
refresh = True
else:
#We have permissions, dataset is not deleted and is in OK state, allow access
if display_link.display_ready():
if app_action in [ 'data', 'param' ]:
assert action_param, "An action param must be provided for a data or param action"
#data is used for things with filenames that could be passed off to a proxy
#in case some display app wants all files to be in the same 'directory',
#data can be forced to param, but not the other way (no filename for other direction)
#get param name from url param name
action_param = display_link.get_param_name_by_url( action_param )
value = display_link.get_param_value( action_param )
assert value, "An invalid parameter name was provided: %s" % action_param
assert value.parameter.viewable, "This parameter is not viewable."
if value.parameter.type == 'data':
content_length = os.path.getsize( value.file_name )
rval = open( value.file_name )
else:
rval = str( value )
content_length = len( rval )
trans.response.set_content_type( value.mime_type() )
trans.response.headers[ 'Content-Length' ] = content_length
return rval
elif app_action == None:
#redirect user to url generated by display link
return trans.response.send_redirect( display_link.display_url() )
else:
msg.append( ( 'Invalid action provided: %s' % app_action, 'error' ) )
else:
if app_action == None:
if trans.history != data.history:
msg.append( ( 'You must import this dataset into your current history before you can view it at the desired display application.', 'error' ) )
else:
refresh = True
msg.append( ( 'This display application is being prepared.', 'info' ) )
if not display_link.preparing_display():
display_link.prepare_display()
else:
raise Exception( 'Attempted a view action (%s) on a non-ready display application' % app_action )
return trans.fill_template_mako( "dataset/display_application/display.mako", msg = msg, display_app = display_app, display_link = display_link, refresh = refresh )
return trans.show_error_message( 'You do not have permission to view this dataset at an external display application.' )
def _undelete( self, trans, id ):
try:
id = int( id )
except ValueError, e:
return False
history = trans.get_history()
data = trans.sa_session.query( self.app.model.HistoryDatasetAssociation ).get( id )
if data and data.undeletable:
# Walk up parent datasets to find the containing history
topmost_parent = data
while topmost_parent.parent:
topmost_parent = topmost_parent.parent
assert topmost_parent in history.datasets, "Data does not belong to current history"
# Mark undeleted
data.mark_undeleted()
trans.sa_session.flush()
trans.log_event( "Dataset id %s has been undeleted" % str(id) )
return True
return False
def _unhide( self, trans, id ):
try:
id = int( id )
except ValueError, e:
return False
history = trans.get_history()
data = trans.sa_session.query( self.app.model.HistoryDatasetAssociation ).get( id )
if data:
# Walk up parent datasets to find the containing history
topmost_parent = data
while topmost_parent.parent:
topmost_parent = topmost_parent.parent
assert topmost_parent in history.datasets, "Data does not belong to current history"
# Mark undeleted
data.mark_unhidden()
trans.sa_session.flush()
trans.log_event( "Dataset id %s has been unhidden" % str(id) )
return True
return False
@web.expose
def undelete( self, trans, id ):
if self._undelete( trans, id ):
return trans.response.send_redirect( web.url_for( controller='root', action='history', show_deleted = True ) )
raise "Error undeleting"
@web.expose
def unhide( self, trans, id ):
if self._unhide( trans, id ):
return trans.response.send_redirect( web.url_for( controller='root', action='history', show_hidden = True ) )
raise "Error unhiding"
@web.expose
def undelete_async( self, trans, id ):
if self._undelete( trans, id ):
return "OK"
raise "Error undeleting"
@web.expose
def copy_datasets( self, trans, source_dataset_ids="", target_history_ids="", new_history_name="", do_copy=False, **kwd ):
params = util.Params( kwd )
user = trans.get_user()
history = trans.get_history()
create_new_history = False
refresh_frames = []
if source_dataset_ids:
if not isinstance( source_dataset_ids, list ):
source_dataset_ids = source_dataset_ids.split( "," )
source_dataset_ids = map( int, source_dataset_ids )
else:
source_dataset_ids = []
if target_history_ids:
if not isinstance( target_history_ids, list ):
target_history_ids = target_history_ids.split( "," )
if "create_new_history" in target_history_ids:
create_new_history = True
target_history_ids.remove( "create_new_history" )
target_history_ids = map( int, target_history_ids )
else:
target_history_ids = []
done_msg = error_msg = ""
if do_copy:
invalid_datasets = 0
if not source_dataset_ids or not ( target_history_ids or create_new_history ):
error_msg = "You must provide both source datasets and target histories."
if create_new_history:
target_history_ids.append( "create_new_history" )
else:
if create_new_history:
new_history = trans.app.model.History()
if new_history_name:
new_history.name = new_history_name
new_history.user = user
trans.sa_session.add( new_history )
trans.sa_session.flush()
target_history_ids.append( new_history.id )
if user:
target_histories = [ hist for hist in map( trans.sa_session.query( trans.app.model.History ).get, target_history_ids ) if ( hist is not None and hist.user == user )]
else:
target_histories = [ history ]
if len( target_histories ) != len( target_history_ids ):
error_msg = error_msg + "You do not have permission to add datasets to %i requested histories. " % ( len( target_history_ids ) - len( target_histories ) )
for data in map( trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get, source_dataset_ids ):
if data is None:
error_msg = error_msg + "You tried to copy a dataset that does not exist. "
invalid_datasets += 1
elif data.history != history:
error_msg = error_msg + "You tried to copy a dataset which is not in your current history. "
invalid_datasets += 1
else:
for hist in target_histories:
hist.add_dataset( data.copy( copy_children = True ) )
if history in target_histories:
refresh_frames = ['history']
trans.sa_session.flush()
done_msg = "%i datasets copied to %i histories." % ( len( source_dataset_ids ) - invalid_datasets, len( target_histories ) )
trans.sa_session.refresh( history )
elif create_new_history:
target_history_ids.append( "create_new_history" )
source_datasets = history.active_datasets
target_histories = [history]
if user:
target_histories = user.active_histories
return trans.fill_template( "/dataset/copy_view.mako",
source_dataset_ids = source_dataset_ids,
target_history_ids = target_history_ids,
source_datasets = source_datasets,
target_histories = target_histories,
new_history_name = new_history_name,
done_msg = done_msg,
error_msg = error_msg,
refresh_frames = refresh_frames )
def _copy_datasets( self, trans, dataset_ids, target_histories, imported=False ):
""" Helper method for copying datasets. """
user = trans.get_user()
done_msg = error_msg = ""
invalid_datasets = 0
if not dataset_ids or not target_histories:
error_msg = "You must provide both source datasets and target histories."
else:
# User must own target histories to copy datasets to them.
for history in target_histories:
if user != history.user:
error_msg = error_msg + "You do not have permission to add datasets to %i requested histories. " % ( len( target_histories ) )
for dataset_id in dataset_ids:
data = self.get_dataset( trans, dataset_id, False, True )
if data is None:
error_msg = error_msg + "You tried to copy a dataset that does not exist or that you do not have access to. "
invalid_datasets += 1
else:
for hist in target_histories:
dataset_copy = data.copy( copy_children = True )
if imported:
dataset_copy.name = "imported: " + dataset_copy.name
hist.add_dataset( dataset_copy )
trans.sa_session.flush()
num_datasets_copied = len( dataset_ids ) - invalid_datasets
done_msg = "%i dataset%s copied to %i histor%s." % \
( num_datasets_copied, iff( num_datasets_copied == 1, "", "s"), len( target_histories ), iff( len ( target_histories ) == 1, "y", "ies") )
trans.sa_session.refresh( history )
if error_msg != "":
status = ERROR
message = error_msg
else:
status = SUCCESS
message = done_msg
return status, message