from galaxy import model
from galaxy.model.item_attrs import *
from galaxy.web.base.controller import *
from galaxy.web.framework.helpers import time_ago, grids, iff
from galaxy.util.sanitize_html import sanitize_html
class VisualizationListGrid( grids.Grid ):
# Grid definition
title = "Saved Visualizations"
model_class = model.Visualization
default_sort_key = "-update_time"
default_filter = dict( title="All", deleted="False", tags="All", sharing="All" )
columns = [
grids.TextColumn( "Title", key="title", attach_popup=True,
link=( lambda item: dict( controller="tracks", action="browser", id=item.id ) ) ),
grids.TextColumn( "Dbkey", key="dbkey" ),
grids.IndividualTagsColumn( "Tags", key="tags", model_tag_association_class=model.VisualizationTagAssociation, filterable="advanced", grid_name="VisualizationListGrid" ),
grids.SharingStatusColumn( "Sharing", key="sharing", filterable="advanced", sortable=False ),
grids.GridColumn( "Created", key="create_time", format=time_ago ),
grids.GridColumn( "Last Updated", key="update_time", format=time_ago ),
]
columns.append(
grids.MulticolFilterColumn(
"Search",
cols_to_filter=[ columns[0], columns[2] ],
key="free-text-search", visible=False, filterable="standard" )
)
global_actions = [
grids.GridAction( "Create new visualization", dict( action='create' ) )
]
operations = [
grids.GridOperation( "View/Edit", allow_multiple=False, url_args=dict( controller='tracks', action='browser' ) ),
grids.GridOperation( "Edit Attributes", allow_multiple=False, url_args=dict( action='edit') ),
grids.GridOperation( "Clone", allow_multiple=False, condition=( lambda item: not item.deleted ), async_compatible=False, url_args=dict( action='clone') ),
grids.GridOperation( "Share or Publish", allow_multiple=False, condition=( lambda item: not item.deleted ), async_compatible=False ),
grids.GridOperation( "Delete", condition=( lambda item: not item.deleted ), async_compatible=True, confirm="Are you sure you want to delete this visualization?" ),
]
def apply_query_filter( self, trans, query, **kwargs ):
return query.filter_by( user=trans.user, deleted=False )
class VisualizationAllPublishedGrid( grids.Grid ):
# Grid definition
use_panels = True
use_async = True
title = "Published Visualizations"
model_class = model.Visualization
default_sort_key = "update_time"
default_filter = dict( title="All", username="All" )
columns = [
grids.PublicURLColumn( "Title", key="title", filterable="advanced" ),
grids.OwnerAnnotationColumn( "Annotation", key="annotation", model_annotation_association_class=model.VisualizationAnnotationAssociation, filterable="advanced" ),
grids.OwnerColumn( "Owner", key="owner", model_class=model.User, filterable="advanced" ),
grids.CommunityRatingColumn( "Community Rating", key="rating" ),
grids.CommunityTagsColumn( "Community Tags", key="tags", model_tag_association_class=model.VisualizationTagAssociation, filterable="advanced", grid_name="VisualizationAllPublishedGrid" ),
grids.ReverseSortColumn( "Last Updated", key="update_time", format=time_ago )
]
columns.append(
grids.MulticolFilterColumn(
"Search",
cols_to_filter=[ columns[0], columns[1], columns[2], columns[3] ],
key="free-text-search", visible=False, filterable="standard" )
)
def build_initial_query( self, trans, **kwargs ):
# Join so that searching history.user makes sense.
return trans.sa_session.query( self.model_class ).join( model.User.table )
def apply_query_filter( self, trans, query, **kwargs ):
return query.filter( self.model_class.deleted==False ).filter( self.model_class.published==True )
class VisualizationController( BaseController, Sharable, UsesAnnotations, UsesVisualization, UsesItemRatings ):
_user_list_grid = VisualizationListGrid()
_published_list_grid = VisualizationAllPublishedGrid()
@web.expose
def list_published( self, trans, *args, **kwargs ):
grid = self._published_list_grid( trans, **kwargs )
if 'async' in kwargs:
return grid
else:
# Render grid wrapped in panels
return trans.fill_template( "visualization/list_published.mako", grid=grid )
@web.expose
@web.require_login( "use Galaxy visualizations", use_panels=True )
def index( self, trans, *args, **kwargs ):
""" Lists user's saved visualizations. """
return self.list( trans, args, kwargs )
@web.expose
@web.require_login()
def clone(self, trans, id, *args, **kwargs):
viz = self.get_visualization( trans, id, check_ownership=False )
user = trans.get_user()
if viz.user == user:
owner = True
else:
if trans.sa_session.query( model.VisualizationUserShareAssociation ) \
.filter_by( user=user, visualization=viz ).count() == 0:
error( "Visualization is not owned by or shared with current user" )
owner = False
new_viz = model.Visualization()
new_viz.title = "Clone of '%s'" % viz.title
new_viz.dbkey = viz.dbkey
new_viz.type = viz.type
new_viz.latest_revision = viz.latest_revision
if not owner:
new_viz.title += " shared by '%s'" % viz.user.email
new_viz.user = user
# Persist
session = trans.sa_session
session.add( new_viz )
session.flush()
# Display the management page
trans.set_message( 'Clone created with name "%s"' % new_viz.title )
return self.list( trans )
@web.expose
@web.require_login( "use Galaxy visualizations", use_panels=True )
def list( self, trans, *args, **kwargs ):
# Handle operation
if 'operation' in kwargs and 'id' in kwargs:
session = trans.sa_session
operation = kwargs['operation'].lower()
ids = util.listify( kwargs['id'] )
for id in ids:
item = session.query( model.Visualization ).get( trans.security.decode_id( id ) )
if operation == "delete":
item.deleted = True
if operation == "share or publish":
return self.sharing( trans, **kwargs )
session.flush()
# Build list of visualizations shared with user.
shared_by_others = trans.sa_session \
.query( model.VisualizationUserShareAssociation ) \
.filter_by( user=trans.get_user() ) \
.join( model.Visualization.table ) \
.filter( model.Visualization.deleted == False ) \
.order_by( desc( model.Visualization.update_time ) ) \
.all()
return trans.fill_template( "visualization/list.mako", grid=self._user_list_grid( trans, *args, **kwargs ), shared_by_others=shared_by_others )
@web.expose
@web.require_login( "modify Galaxy visualizations" )
def set_slug_async( self, trans, id, new_slug ):
""" Set item slug asynchronously. """
visualization = self.get_visualization( trans, id )
if visualization:
visualization.slug = new_slug
trans.sa_session.flush()
return visualization.slug
@web.expose
@web.require_login( "use Galaxy visualizations" )
def set_accessible_async( self, trans, id=None, accessible=False ):
""" Set visualization's importable attribute and slug. """
visualization = self.get_visualization( trans, id )
# Only set if importable value would change; this prevents a change in the update_time unless attribute really changed.
importable = accessible in ['True', 'true', 't', 'T'];
if visualization and visualization.importable != importable:
if importable:
self._make_item_accessible( trans.sa_session, visualization )
else:
visualization.importable = importable
trans.sa_session.flush()
return
@web.expose
@web.require_login( "rate items" )
@web.json
def rate_async( self, trans, id, rating ):
""" Rate a visualization asynchronously and return updated community data. """
visualization = self.get_visualization( trans, id, check_ownership=False, check_accessible=True )
if not visualization:
return trans.show_error_message( "The specified visualization does not exist." )
# Rate visualization.
visualization_rating = self.rate_item( trans.sa_session, trans.get_user(), visualization, rating )
return self.get_ave_item_rating_data( trans.sa_session, visualization )
@web.expose
@web.require_login( "share Galaxy visualizations" )
def imp( self, trans, id ):
""" Import a visualization into user's workspace. """
# Set referer message.
referer = trans.request.referer
if referer is not "":
referer_message = "return to the previous page" % referer
else:
referer_message = "go to Galaxy's start page" % url_for( '/' )
# Do import.
session = trans.sa_session
visualization = self.get_visualization( trans, id, check_ownership=False )
if visualization.importable == False:
return trans.show_error_message( "The owner of this visualization has disabled imports via this link.
You can %s" % referer_message, use_panels=True )
elif visualization.user == trans.user:
return trans.show_error_message( "You can't import this visualization because you own it.
You can %s" % referer_message, use_panels=True )
elif visualization.deleted:
return trans.show_error_message( "You can't import this visualization because it has been deleted.
You can %s" % referer_message, use_panels=True )
else:
# Create imported visualization via copy. TODO: Visualizations use datasets -- do we need to check to ensure that
# datasets can be imported/viewed and/or copy datasets to user?
imported_visualization = model.Visualization()
imported_visualization.title = "imported: " + visualization.title
imported_visualization.latest_revision = visualization.latest_revision
imported_visualization.user = trans.user
# Save new visualization.
session = trans.sa_session
session.add( imported_visualization )
session.flush()
# Redirect to load galaxy frames.
return trans.show_ok_message(
message="""Visualization "%s" has been imported.
You can start using this visualization or %s."""
% ( visualization.title, web.url_for( controller='visualization' ), referer_message ), use_panels=True )
@web.expose
@web.require_login( "share Galaxy visualizations" )
def sharing( self, trans, id, **kwargs ):
""" Handle visualization sharing. """
# Get session and visualization.
session = trans.sa_session
visualization = self.get_visualization( trans, id, check_ownership=True )
# Do operation on visualization.
if 'make_accessible_via_link' in kwargs:
self._make_item_accessible( trans.sa_session, visualization )
elif 'make_accessible_and_publish' in kwargs:
self._make_item_accessible( trans.sa_session, visualization )
visualization.published = True
elif 'publish' in kwargs:
visualization.published = True
elif 'disable_link_access' in kwargs:
visualization.importable = False
elif 'unpublish' in kwargs:
visualization.published = False
elif 'disable_link_access_and_unpublish' in kwargs:
visualization.importable = visualization.published = False
elif 'unshare_user' in kwargs:
user = session.query( model.User ).get( trans.security.decode_id( kwargs['unshare_user' ] ) )
if not user:
error( "User not found for provided id" )
association = session.query( model.VisualizationUserShareAssociation ) \
.filter_by( user=user, visualization=visualization ).one()
session.delete( association )
session.flush()
return trans.fill_template( "/sharing_base.mako", item=visualization, use_panels=True )
@web.expose
@web.require_login( "share Galaxy visualizations" )
def share( self, trans, id=None, email="", use_panels=False ):
""" Handle sharing a visualization with a particular user. """
msg = mtype = None
visualization = self.get_visualization( trans, id, check_ownership=True )
if email:
other = trans.sa_session.query( model.User ) \
.filter( and_( model.User.table.c.email==email,
model.User.table.c.deleted==False ) ) \
.first()
if not other:
mtype = "error"
msg = ( "User '%s' does not exist" % email )
elif other == trans.get_user():
mtype = "error"
msg = ( "You cannot share a visualization with yourself" )
elif trans.sa_session.query( model.VisualizationUserShareAssociation ) \
.filter_by( user=other, visualization=visualization ).count() > 0:
mtype = "error"
msg = ( "Visualization already shared with '%s'" % email )
else:
share = model.VisualizationUserShareAssociation()
share.visualization = visualization
share.user = other
session = trans.sa_session
session.add( share )
self.create_item_slug( session, visualization )
session.flush()
trans.set_message( "Visualization '%s' shared with user '%s'" % ( visualization.title, other.email ) )
return trans.response.send_redirect( url_for( action='sharing', id=id ) )
return trans.fill_template( "/ind_share_base.mako",
message = msg,
messagetype = mtype,
item=visualization,
email=email,
use_panels=use_panels )
@web.expose
def display_by_username_and_slug( self, trans, username, slug ):
""" Display visualization based on a username and slug. """
# Get visualization.
session = trans.sa_session
user = session.query( model.User ).filter_by( username=username ).first()
visualization = trans.sa_session.query( model.Visualization ).filter_by( user=user, slug=slug, deleted=False ).first()
if visualization is None:
raise web.httpexceptions.HTTPNotFound()
# Security check raises error if user cannot access visualization.
self.security_check( trans.get_user(), visualization, False, True)
# Get rating data.
user_item_rating = 0
if trans.get_user():
user_item_rating = self.get_user_item_rating( trans.sa_session, trans.get_user(), visualization )
if user_item_rating:
user_item_rating = user_item_rating.rating
else:
user_item_rating = 0
ave_item_rating, num_ratings = self.get_ave_item_rating_data( trans.sa_session, visualization )
# Display.
visualization_config = self.get_visualization_config( trans, visualization )
return trans.stream_template_mako( "visualization/display.mako", item = visualization, item_data = visualization_config,
user_item_rating = user_item_rating, ave_item_rating=ave_item_rating, num_ratings=num_ratings,
content_only=True )
@web.expose
@web.json
@web.require_login( "get item name and link" )
def get_name_and_link_async( self, trans, id=None ):
""" Returns visualization's name and link. """
visualization = self.get_visualization( trans, id, check_ownership=False, check_accessible=True )
if self.create_item_slug( trans.sa_session, visualization ):
trans.sa_session.flush()
return_dict = { "name" : visualization.title, "link" : url_for( action="display_by_username_and_slug", username=visualization.user.username, slug=visualization.slug ) }
return return_dict
@web.expose
def get_item_content_async( self, trans, id ):
""" Returns item content in HTML format. """
# Get visualization, making sure it's accessible.
visualization = self.get_visualization( trans, id, check_ownership=False, check_accessible=True )
if visualization is None:
raise web.httpexceptions.HTTPNotFound()
# Return content.
visualization_config = self.get_visualization_config( trans, visualization )
return trans.fill_template_mako( "visualization/item_content.mako", encoded_id=trans.security.encode_id(visualization.id),
item=visualization, item_data=visualization_config, content_only=True )
@web.expose
@web.require_login( "create visualizations" )
def create( self, trans, visualization_title="", visualization_slug="", visualization_annotation="", visualization_dbkey="" ):
"""
Create a new visualization
"""
user = trans.get_user()
visualization_title_err = visualization_slug_err = visualization_annotation_err = ""
if trans.request.method == "POST":
if not visualization_title:
visualization_title_err = "visualization name is required"
elif not visualization_slug:
visualization_slug_err = "visualization id is required"
elif not VALID_SLUG_RE.match( visualization_slug ):
visualization_slug_err = "visualization identifier must consist of only lowercase letters, numbers, and the '-' character"
elif trans.sa_session.query( model.Visualization ).filter_by( user=user, slug=visualization_slug, deleted=False ).first():
visualization_slug_err = "visualization id must be unique"
else:
# Create the new stored visualization
visualization = model.Visualization()
visualization.title = visualization_title
visualization.slug = visualization_slug
visualization.dbkey = visualization_dbkey
visualization.type = 'trackster' # HACK: set visualization type to trackster since it's the only viz
visualization_annotation = sanitize_html( visualization_annotation, 'utf-8', 'text/html' )
self.add_item_annotation( trans.sa_session, trans.get_user(), visualization, visualization_annotation )
visualization.user = user
# And the first (empty) visualization revision
visualization_revision = model.VisualizationRevision()
visualization_revision.title = visualization_title
visualization_revision.config = {}
visualization_revision.dbkey = visualization_dbkey
visualization_revision.visualization = visualization
visualization.latest_revision = visualization_revision
# Persist
session = trans.sa_session
session.add(visualization)
session.add(visualization_revision)
session.flush()
return trans.response.send_redirect( web.url_for( action='list' ) )
return trans.show_form(
web.FormBuilder( web.url_for(), "Create new visualization", submit_text="Submit" )
.add_text( "visualization_title", "Visualization title", value=visualization_title, error=visualization_title_err )
.add_text( "visualization_slug", "Visualization identifier", value=visualization_slug, error=visualization_slug_err,
help="""A unique identifier that will be used for
public links to this visualization. A default is generated
from the visualization title, but can be edited. This field
must contain only lowercase letters, numbers, and
the '-' character.""" )
.add_select( "visualization_dbkey", "Visualization DbKey/Build", value=visualization_dbkey, options=self._get_dbkeys( trans ), error=None)
.add_text( "visualization_annotation", "Visualization annotation", value=visualization_annotation, error=visualization_annotation_err,
help="A description of the visualization; annotation is shown alongside published visualizations."),
template="visualization/create.mako" )
@web.expose
@web.require_login( "edit visualizations" )
def edit( self, trans, id, visualization_title="", visualization_slug="", visualization_annotation="" ):
"""
Edit a visualization's attributes.
"""
visualization = self.get_visualization( trans, id, check_ownership=True )
session = trans.sa_session
visualization_title_err = visualization_slug_err = visualization_annotation_err = ""
if trans.request.method == "POST":
if not visualization_title:
visualization_title_err = "Visualization name is required"
elif not visualization_slug:
visualization_slug_err = "Visualization id is required"
elif not VALID_SLUG_RE.match( visualization_slug ):
visualization_slug_err = "Visualization identifier must consist of only lowercase letters, numbers, and the '-' character"
elif visualization_slug != visualization.slug and trans.sa_session.query( model.Visualization ).filter_by( user=visualization.user, slug=visualization_slug, deleted=False ).first():
visualization_slug_err = "Visualization id must be unique"
else:
visualization.title = visualization_title
visualization.slug = visualization_slug
if visualization_annotation != "":
visualization_annotation = sanitize_html( visualization_annotation, 'utf-8', 'text/html' )
self.add_item_annotation( trans.sa_session, trans.get_user(), visualization, visualization_annotation )
session.flush()
# Redirect to visualization list.
return trans.response.send_redirect( web.url_for( action='list' ) )
else:
visualization_title = visualization.title
# Create slug if it's not already set.
if visualization.slug is None:
self.create_item_slug( trans.sa_session, visualization )
visualization_slug = visualization.slug
visualization_annotation = self.get_item_annotation_str( trans.sa_session, trans.user, visualization )
if not visualization_annotation:
visualization_annotation = ""
return trans.show_form(
web.FormBuilder( web.url_for( id=id ), "Edit visualization attributes", submit_text="Submit" )
.add_text( "visualization_title", "Visualization title", value=visualization_title, error=visualization_title_err )
.add_text( "visualization_slug", "Visualization identifier", value=visualization_slug, error=visualization_slug_err,
help="""A unique identifier that will be used for
public links to this visualization. A default is generated
from the visualization title, but can be edited. This field
must contain only lowercase letters, numbers, and
the '-' character.""" )
.add_text( "visualization_annotation", "Visualization annotation", value=visualization_annotation, error=visualization_annotation_err,
help="A description of the visualization; annotation is shown alongside published visualizations."),
template="visualization/create.mako" )