from galaxy.web.base.controller import *
from galaxy.web.framework.helpers import time_ago, iff, grids
from galaxy.model.orm import *
from galaxy import model, util
from galaxy.web.form_builder import *
import logging, os, csv
log = logging.getLogger( __name__ )
class RequestsGrid( grids.Grid ):
# Custom column types
class NameColumn( grids.TextColumn ):
def get_value( self, trans, grid, request ):
return request.name
class DescriptionColumn( grids.TextColumn ):
def get_value(self, trans, grid, request):
return request.desc
class SamplesColumn( grids.GridColumn ):
def get_value(self, trans, grid, request):
return str( len( request.samples ) )
class TypeColumn( grids.TextColumn ):
def get_value( self, trans, grid, request ):
return request.type.name
class StateColumn( grids.StateColumn ):
def get_value(self, trans, grid, request ):
state = request.state
if state == request.states.REJECTED:
state_color = 'error'
elif state == request.states.NEW:
state_color = 'new'
elif state == request.states.SUBMITTED:
state_color = 'running'
elif state == request.states.COMPLETE:
state_color = 'ok'
else:
state_color = state
return '
%s
' % ( state_color, state )
def filter( self, trans, user, query, column_filter ):
""" Modify query to filter request by state. """
if column_filter == "All":
return query
if column_filter:
return query.join( model.RequestEvent.table ) \
.filter( self.model_class.table.c.id == model.RequestEvent.table.c.request_id ) \
.filter( model.RequestEvent.table.c.state == column_filter ) \
.filter( model.RequestEvent.table.c.id.in_( select( columns=[ func.max( model.RequestEvent.table.c.id ) ],
from_obj=model.RequestEvent.table,
group_by=model.RequestEvent.table.c.request_id ) ) )
def get_accepted_filters( self ):
""" Returns a list of accepted filters for this column. """
# TODO: is this method necessary?
accepted_filter_labels_and_vals = [ model.Request.states.get( state ) for state in model.Request.states ]
accepted_filter_labels_and_vals.append( "All" )
accepted_filters = []
for val in accepted_filter_labels_and_vals:
label = val.lower()
args = { self.key: val }
accepted_filters.append( grids.GridColumnFilter( label, args ) )
return accepted_filters
# Grid definition
title = "Sequencing Requests"
template = "requests/grid.mako"
model_class = model.Request
default_sort_key = "-update_time"
num_rows_per_page = 50
preserve_state = True
use_paging = True
default_filter = dict( state="All", deleted="False" )
columns = [
NameColumn( "Name",
key="name",
link=( lambda item: iff( item.deleted, None, dict( operation="manage_request", id=item.id ) ) ),
attach_popup=True,
filterable="advanced" ),
DescriptionColumn( "Description",
key='desc',
filterable="advanced" ),
SamplesColumn( "Samples",
link=( lambda item: iff( item.deleted, None, dict( operation="manage_request", id=item.id ) ) ) ),
TypeColumn( "Sequencer",
link=( lambda item: iff( item.deleted, None, dict( operation="view_type", id=item.type.id ) ) ) ),
grids.GridColumn( "Last Updated", key="update_time", format=time_ago ),
grids.DeletedColumn( "Deleted",
key="deleted",
visible=False,
filterable="advanced" ),
StateColumn( "State",
key='state',
filterable="advanced",
link=( lambda item: iff( item.deleted, None, dict( operation="request_events", id=item.id ) ) )
)
]
columns.append( grids.MulticolFilterColumn( "Search",
cols_to_filter=[ columns[0], columns[1] ],
key="free-text-search",
visible=False,
filterable="standard" ) )
operations = [
grids.GridOperation( "Submit",
allow_multiple=False,
condition=( lambda item: not item.deleted and item.is_unsubmitted and item.samples ),
confirm="Samples cannot be added to this request after it is submitted. Click OK to submit." )
]
class RequestsCommon( BaseController, UsesFormDefinitionWidgets ):
@web.json
def sample_state_updates( self, trans, ids=None, states=None ):
# Avoid caching
trans.response.headers['Pragma'] = 'no-cache'
trans.response.headers['Expires'] = '0'
# Create new HTML for any that have changed
rval = {}
if ids is not None and states is not None:
ids = map( int, ids.split( "," ) )
states = states.split( "," )
for id, state in zip( ids, states ):
sample = trans.sa_session.query( self.app.model.Sample ).get( id )
if sample.state.name != state:
rval[ id ] = { "state": sample.state.name,
"datasets": len( sample.datasets ),
"html_state": unicode( trans.fill_template( "requests/common/sample_state.mako",
sample=sample),
'utf-8' ),
"html_datasets": unicode( trans.fill_template( "requests/common/sample_datasets.mako",
sample=sample ),
'utf-8' ) }
return rval
@web.expose
@web.require_login( "create sequencing requests" )
def create_request( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
is_admin = cntrller == 'requests_admin' and trans.user_is_admin()
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
request_type_id = params.get( 'request_type_id', 'none' )
if request_type_id != 'none':
request_type = trans.sa_session.query( trans.model.RequestType ).get( trans.security.decode_id( request_type_id ) )
else:
request_type = None
# user_id will not be 'none' if an admin user is submitting this request on behalf of another user
# and they selected that user's id from the user_id SelectField.
user_id_encoded = True
user_id = params.get( 'user_id', 'none' )
if user_id != 'none':
try:
user = trans.sa_session.query( trans.model.User ).get( trans.security.decode_id( user_id ) )
except TypeError, e:
# We must have an email address rather than an encoded user id
# This is because the galaxy.base.js creates a search+select box
# when there are more than 20 items in a selectfield
user = trans.sa_session.query( trans.model.User ) \
.filter( trans.model.User.table.c.email==util.restore_text( user_id ) ) \
.first()
user_id_encoded = False
elif not is_admin:
user = trans.user
else:
user = None
if params.get( 'create_request_button', False ) or params.get( 'add_sample_button', False ):
name = util.restore_text( params.get( 'name', '' ) )
if is_admin and user_id == 'none':
message = 'Select the user on behalf of whom you are submitting this request.'
status = 'error'
elif user is None:
message = 'Invalid user ID (%s)' % str(user_id)
status = 'error'
elif not name:
message = 'Enter the name of the request.'
status = 'error'
else:
request = self.__save_request( trans, cntrller, **kwd )
message = 'The request has been created.'
if params.get( 'create_request_button', False ):
return trans.response.send_redirect( web.url_for( controller=cntrller,
action='browse_requests',
message=message ,
status='done' ) )
elif params.get( 'add_sample_button', False ):
return self.__add_sample( trans, cntrller, request, **kwd )
request_type_select_field = self.__build_request_type_id_select_field( trans, selected_value=request_type_id )
# Widgets to be rendered on the request form
widgets = []
if request_type is not None or status == 'error':
# Either the user selected a request_type or an error exists on the form.
widgets.append( dict( label='Name of the Experiment',
widget=TextField( 'name', 40, util.restore_text( params.get( 'name', '' ) ) ),
helptext='(Required)') )
widgets.append( dict( label='Description',
widget=TextField( 'desc', 40, util.restore_text( params.get( 'desc', '' ) )),
helptext='(Optional)') )
if request_type is not None:
widgets += request_type.request_form.get_widgets( user, **kwd )
# In case there is an error on the form, make sure to populate widget fields with anything the user
# may have already entered.
self.populate_widgets_from_kwd( trans, widgets, **kwd )
if request_type is not None or status == 'error':
# Either the user selected a request_type or an error exists on the form.
if is_admin:
if not user_id_encoded:
selected_user_id = trans.security.encode_id( user.id )
else:
selected_user_id = user_id
user_widget = dict( label='Select user',
widget=self.__build_user_id_select_field( trans, selected_value=selected_user_id ),
helptext='Submit the request on behalf of the selected user (Required)')
widgets = [ user_widget ] + widgets
return trans.fill_template( '/requests/common/create_request.mako',
cntrller=cntrller,
request_type_select_field=request_type_select_field,
request_type_select_field_selected=request_type_id,
widgets=widgets,
message=message,
status=status )
@web.expose
@web.require_login( "edit sequencing requests" )
def edit_basic_request_info( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
request_id = params.get( 'id', None )
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) )
except:
return invalid_id_redirect( trans, cntrller, request_id )
name = util.restore_text( params.get( 'name', '' ) )
desc = util.restore_text( params.get( 'desc', '' ) )
if params.get( 'edit_basic_request_info_button', False ) or params.get( 'edit_samples_button', False ):
if not name:
status = 'error'
message = 'Enter the name of the request'
else:
request = self.__save_request( trans, cntrller, request=request, **kwd )
message = 'The changes made to request (%s) have been saved.' % request.name
# Widgets to be rendered on the request form
widgets = []
widgets.append( dict( label='Name',
widget=TextField( 'name', 40, request.name ),
helptext='(Required)' ) )
widgets.append( dict( label='Description',
widget=TextField( 'desc', 40, request.desc ),
helptext='(Optional)' ) )
widgets = widgets + request.type.request_form.get_widgets( request.user, request.values.content, **kwd )
# In case there is an error on the form, make sure to populate widget fields with anything the user
# may have already entered.
self.populate_widgets_from_kwd( trans, widgets, **kwd )
return trans.fill_template( 'requests/common/edit_basic_request_info.mako',
cntrller=cntrller,
request_type=request.type,
request=request,
widgets=widgets,
message=message,
status=status )
def __save_request( self, trans, cntrller, request=None, **kwd ):
"""
Saves changes to an existing request, or creates a new
request if received request is None.
"""
params = util.Params( kwd )
request_type_id = params.get( 'request_type_id', None )
is_admin = cntrller == 'requests_admin' and trans.user_is_admin()
if request is None:
# We're creating a new request, so we need the associated request_type
request_type = trans.sa_session.query( trans.model.RequestType ).get( trans.security.decode_id( request_type_id ) )
if is_admin:
# The admin user is creating a request on behalf of another user
user_id = params.get( 'user_id', '' )
user = trans.sa_session.query( trans.model.User ).get( trans.security.decode_id( user_id ) )
else:
user = trans.user
else:
# We're saving changes to an existing request
user = request.user
request_type = request.type
name = util.restore_text( params.get( 'name', '' ) )
desc = util.restore_text( params.get( 'desc', '' ) )
notification = dict( email=[ user.email ], sample_states=[ request_type.state.id ], body='', subject='' )
values = []
for index, field in enumerate( request_type.request_form.fields ):
field_type = field[ 'type' ]
field_value = params.get( 'field_%i' % index, '' )
if field[ 'type' ] == 'AddressField':
value = util.restore_text( field_value )
if value == 'new':
# Save this new address in the list of this user's addresses
user_address = trans.model.UserAddress( user=user )
self.save_widget_field( trans, user_address, index, **kwd )
trans.sa_session.refresh( user )
values.append( int( user_address.id ) )
elif value in [ '', 'none', 'None', None ]:
values.append( '' )
else:
values.append( int( value ) )
elif field[ 'type' ] == 'CheckboxField':
values.append( CheckboxField.is_checked( field_value ))
else:
values.append( util.restore_text( field_value ) )
form_values = trans.model.FormValues( request_type.request_form, values )
trans.sa_session.add( form_values )
trans.sa_session.flush()
if request is None:
# We're creating a new request
request = trans.model.Request( name, desc, request_type, user, form_values, notification )
trans.sa_session.add( request )
trans.sa_session.flush()
trans.sa_session.refresh( request )
# Create an event with state 'New' for this new request
if request.user != trans.user:
sample_event_comment = "Request created by user %s for user %s." % ( trans.user.email, request.user.email )
else:
sample_event_comment = "Request created."
event = trans.model.RequestEvent( request, request.states.NEW, sample_event_comment )
trans.sa_session.add( event )
trans.sa_session.flush()
else:
# We're saving changes to an existing request
request.name = name
request.desc = desc
request.type = request_type
request.user = user
request.notification = notification
request.values = form_values
trans.sa_session.add( request )
trans.sa_session.flush()
return request
@web.expose
@web.require_login( "submit sequencing requests" )
def submit_request( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
request_id = params.get( 'id', None )
message = util.restore_text( params.get( 'message', '' ) )
status = util.restore_text( params.get( 'status', 'done' ) )
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) )
except:
return invalid_id_redirect( trans, cntrller, request_id )
ok = True
if not request.samples:
message = 'Add at least 1 sample to this request before submitting.'
ok = False
if ok:
message = self.__validate_request( trans, cntrller, request )
if message or not ok:
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='edit_basic_request_info',
cntrller=cntrller,
id = request_id,
status='error',
message=message ) )
# Change the request state to 'Submitted'
if request.user.email is not trans.user:
sample_event_comment = "Request submitted by %s on behalf of %s." % ( trans.user.email, request.user.email )
else:
sample_event_comment = ""
event = trans.model.RequestEvent( request, request.states.SUBMITTED, sample_event_comment )
trans.sa_session.add( event )
# change the state of each of the samples of thus request
new_state = request.type.states[0]
for sample in request.samples:
event = trans.model.SampleEvent( sample, new_state, 'Samples created.' )
trans.sa_session.add( event )
trans.sa_session.add( request )
trans.sa_session.flush()
request.send_email_notification( trans, new_state )
message = 'The request has been submitted.'
return trans.response.send_redirect( web.url_for( controller=cntrller,
action='browse_requests',
cntrller=cntrller,
id=request_id,
status=status,
message=message ) )
@web.expose
@web.require_login( "sequencing request page" )
def manage_request( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
is_admin = cntrller == 'requests_admin' and trans.user_is_admin()
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
request_id = params.get( 'id', None )
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) )
except:
return invalid_id_redirect( trans, cntrller, request_id )
sample_state_id = params.get( 'sample_state_id', None )
# Get the user entered sample information
current_samples, managing_samples, libraries = self.__get_sample_info( trans, request, **kwd )
selected_samples = self.__get_selected_samples( trans, request, **kwd )
selected_value = params.get( 'sample_operation', 'none' )
if selected_value != 'none' and not selected_samples:
message = 'Select at least one sample before selecting an operation.'
status = 'error'
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=request_id,
status=status,
message=message ) )
sample_operation_select_field = self.__build_sample_operation_select_field( trans, is_admin, request, selected_value )
sample_operation_selected_value = sample_operation_select_field.get_selected( return_value=True )
if params.get( 'import_samples_button', False ):
# Import sample field values from a csv file
return self.__import_samples( trans, cntrller, request, current_samples, libraries, **kwd )
elif params.get( 'add_sample_button', False ):
return self.__add_sample( trans, cntrller, request, **kwd )
elif params.get( 'save_samples_button', False ):
return self.__save_sample( trans, cntrller, request, current_samples, **kwd )
elif params.get( 'edit_samples_button', False ):
managing_samples = True
elif params.get( 'cancel_changes_button', False ):
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=request_id ) )
pass
elif params.get( 'change_state_button', False ):
sample_event_comment = util.restore_text( params.get( 'sample_event_comment', '' ) )
new_state = trans.sa_session.query( trans.model.SampleState ).get( trans.security.decode_id( sample_state_id ) )
for sample_id in selected_samples:
sample = trans.sa_session.query( trans.model.Sample ).get( trans.security.decode_id( sample_id ) )
event = trans.model.SampleEvent( sample, new_state, sample_event_comment )
trans.sa_session.add( event )
trans.sa_session.flush()
return trans.response.send_redirect( web.url_for( controller='requests_common',
cntrller=cntrller,
action='update_request_state',
request_id=request_id ) )
elif params.get( 'cancel_change_state_button', False ):
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=request_id ) )
elif params.get( 'change_lib_button', False ):
library_id = params.get( 'sample_0_library_id', None )
try:
library = trans.sa_session.query( trans.model.Library ).get( trans.security.decode_id( library_id ) )
except:
invalid_id_redirect( trans, cntrller, library_id )
folder_id = params.get( 'sample_0_folder_id', None )
try:
folder = trans.sa_session.query( trans.model.LibraryFolder ).get( trans.security.decode_id( folder_id ) )
except:
invalid_id_redirect( trans, cntrller, folder_id )
for sample_id in selected_samples:
sample = trans.sa_session.query( trans.model.Sample ).get( trans.security.decode_id( sample_id ) )
sample.library = library
sample.folder = folder
trans.sa_session.add( sample )
trans.sa_session.flush()
trans.sa_session.refresh( request )
message = 'Changes made to the selected samples have been saved. '
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=request_id,
status=status,
message=message ) )
elif params.get( 'cancel_change_lib_button', False ):
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=trans.security.encode_id( request.id ) ) )
request_widgets = self.__get_request_widgets( trans, request.id )
sample_copy = self.__build_copy_sample_select_field( trans, current_samples )
libraries_select_field, folders_select_field = self.__build_library_and_folder_select_fields( trans,
request.user,
0,
libraries,
None,
**kwd )
# Build the sample_state_id_select_field SelectField
sample_state_id_select_field = self.__build_sample_state_id_select_field( trans, request, sample_state_id )
return trans.fill_template( '/requests/common/manage_request.mako',
cntrller=cntrller,
request=request,
selected_samples=selected_samples,
request_widgets=request_widgets,
current_samples=current_samples,
sample_copy=sample_copy,
libraries=libraries,
sample_operation_select_field=sample_operation_select_field,
libraries_select_field=libraries_select_field,
folders_select_field=folders_select_field,
sample_state_id_select_field=sample_state_id_select_field,
managing_samples=managing_samples,
status=status,
message=message )
@web.expose
@web.require_login( "delete sequencing requests" )
def delete_request( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
id_list = util.listify( kwd.get( 'id', '' ) )
message = util.restore_text( params.get( 'message', '' ) )
status = util.restore_text( params.get( 'status', 'done' ) )
num_deleted = 0
for id in id_list:
ok_for_now = True
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( id ) )
except:
message += "Invalid request ID (%s). " % str( id )
status = 'error'
ok_for_now = False
if ok_for_now:
request.deleted = True
trans.sa_session.add( request )
# delete all the samples belonging to this request
for s in request.samples:
s.deleted = True
trans.sa_session.add( s )
trans.sa_session.flush()
num_deleted += 1
message += '%i requests have been deleted.' % num_deleted
return trans.response.send_redirect( web.url_for( controller=cntrller,
action='browse_requests',
status=status,
message=message ) )
@web.expose
@web.require_login( "undelete sequencing requests" )
def undelete_request( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
id_list = util.listify( kwd.get( 'id', '' ) )
message = util.restore_text( params.get( 'message', '' ) )
status = util.restore_text( params.get( 'status', 'done' ) )
num_undeleted = 0
for id in id_list:
ok_for_now = True
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( id ) )
except:
message += "Invalid request ID (%s). " % str( id )
status = 'error'
ok_for_now = False
if ok_for_now:
request.deleted = False
trans.sa_session.add( request )
# undelete all the samples belonging to this request
for s in request.samples:
s.deleted = False
trans.sa_session.add( s )
trans.sa_session.flush()
num_undeleted += 1
message += '%i requests have been undeleted.' % num_undeleted
return trans.response.send_redirect( web.url_for( controller=cntrller,
action='browse_requests',
status=status,
message=message ) )
@web.expose
@web.require_login( "sequencing request events" )
def request_events( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
request_id = params.get( 'id', None )
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) )
except:
return invalid_id_redirect( trans, cntrller, request_id )
events_list = []
for event in request.events:
events_list.append( ( event.state, time_ago( event.update_time ), event.comment ) )
return trans.fill_template( '/requests/common/events.mako',
cntrller=cntrller,
events_list=events_list,
request=request )
@web.expose
@web.require_login( "edit email notification settings" )
def edit_email_settings( self, trans, cntrller, **kwd ):
"""
Allow for changing the email notification settings where email is sent to a list of users
whenever the request state changes to one selected for notification.
"""
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
request_id = params.get( 'id', None )
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) )
except:
return invalid_id_redirect( trans, cntrller, request_id )
email_address = CheckboxField.is_checked( params.get( 'email_address', '' ) )
additional_email_addresses = params.get( 'additional_email_addresses', '' )
# Get the list of checked sample state CheckBoxFields
checked_sample_states = []
for index, sample_state in enumerate( request.type.states ):
if CheckboxField.is_checked( params.get( 'sample_state_%i' % sample_state.id, '' ) ):
checked_sample_states.append( sample_state.id )
if additional_email_addresses:
additional_email_addresses = additional_email_addresses.split( '\r\n' )
if email_address or additional_email_addresses:
# The user added 1 or more email addresses
email_addresses = []
if email_address:
email_addresses.append( request.user.email )
for email_address in additional_email_addresses:
email_addresses.append( util.restore_text( email_address ) )
# Make sure email addresses are valid
err_msg = ''
for email_address in email_addresses:
err_msg += self.__validate_email( email_address )
if err_msg:
status = 'error'
message += err_msg
else:
request.notification = dict( email=email_addresses,
sample_states=checked_sample_states,
body='',
subject='' )
else:
# The user may have eliminated email addresses that were previously set
request.notification = None
if checked_sample_states:
message = 'All sample states have been unchecked since no email addresses have been selected or entered. '
trans.sa_session.add( request )
trans.sa_session.flush()
trans.sa_session.refresh( request )
message += 'The changes made to the email notification settings have been saved.'
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='edit_basic_request_info',
cntrller=cntrller,
id=request_id,
message=message ,
status=status ) )
@web.expose
@web.require_login( "update sequencing request state" )
def update_request_state( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
message = params.get( 'message', '' )
status = params.get( 'status', 'done' )
request_id = params.get( 'request_id', None )
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) )
except:
return invalid_id_redirect( trans, cntrller, request_id )
# Make sure all the samples of the current request have the same state
common_state = request.samples_have_common_state
if not common_state:
# If the current request state is complete and one of its samples moved from
# the final sample state, then move the request state to In-progress
if request.is_complete:
message = "At least 1 sample state moved from the final sample state, so now the request is in the '%s' state" % request.states.SUBMITTED
event = trans.model.RequestEvent( request, request.states.SUBMITTED, message )
trans.sa_session.add( event )
trans.sa_session.flush()
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=request_id,
status=status,
message=message ) )
final_state = False
request_type_state = request.type.state
if common_state.id == request_type_state.id:
# since all the samples are in the final state, change the request state to 'Complete'
comments = "All samples of this request are in the last sample state (%s). " % request_type_state.name
state = request.states.COMPLETE
final_state = True
else:
comments = "All samples are in %s state. " % common_state.name
state = request.states.SUBMITTED
event = trans.model.RequestEvent(request, state, comments)
trans.sa_session.add( event )
trans.sa_session.flush()
# check if an email notification is configured to be sent when the samples
# are in this state
retval = request.send_email_notification( trans, common_state, final_state )
if retval:
message = comments + retval
else:
message = comments
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=trans.security.encode_id(request.id),
status='done',
message=message ) )
def __save_sample( self, trans, cntrller, request, current_samples, **kwd ):
# Save all the new/unsaved samples entered by the user
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
managing_samples = util.string_as_bool( params.get( 'managing_samples', False ) )
is_admin = cntrller == 'requests_admin' and trans.user_is_admin()
selected_value = params.get( 'sample_operation', 'none' )
# Check for duplicate sample names
message = ''
for index in range( len( current_samples ) - len( request.samples ) ):
sample_index = index + len( request.samples )
current_sample = current_samples[ sample_index ]
sample_name = current_sample[ 'name' ]
if not sample_name.strip():
message = 'Enter the name of sample number %i' % sample_index
break
count = 0
for i in range( len( current_samples ) ):
if sample_name == current_samples[ i ][ 'name' ]:
count += 1
if count > 1:
message = "This request has %i samples with the name (%s). Samples belonging to a request must have unique names." % ( count, sample_name )
break
if message:
selected_samples = self.__get_selected_samples( trans, request, **kwd )
request_widgets = self.__get_request_widgets( trans, request.id )
sample_copy = self.__build_copy_sample_select_field( trans, current_samples )
sample_operation_select_field = self.__build_sample_operation_select_field( trans, is_admin, request, selected_value )
status = 'error'
return trans.fill_template( '/requests/common/manage_request.mako',
cntrller=cntrller,
request=request,
selected_samples=selected_samples,
request_widgets=request_widgets,
current_samples=current_samples,
sample_copy=sample_copy,
managing_samples=managing_samples,
sample_operation_select_field=sample_operation_select_field,
status=status,
message=message )
if not managing_samples:
for index in range( len( current_samples ) - len( request.samples ) ):
sample_index = len( request.samples )
current_sample = current_samples[ sample_index ]
form_values = trans.model.FormValues( request.type.sample_form, current_sample[ 'field_values' ] )
trans.sa_session.add( form_values )
trans.sa_session.flush()
s = trans.model.Sample( current_sample[ 'name' ],
'',
request,
form_values,
current_sample[ 'barcode' ],
current_sample[ 'library' ],
current_sample[ 'folder' ] )
trans.sa_session.add( s )
trans.sa_session.flush()
else:
message = 'Changes made to the samples are saved. '
for sample_index in range( len( current_samples ) ):
sample = request.samples[ sample_index ]
current_sample = current_samples[ sample_index ]
sample.name = current_sample[ 'name' ]
sample.library = current_sample[ 'library' ]
sample.folder = current_sample[ 'folder' ]
if request.is_submitted:
bc_message = self.__validate_barcode( trans, sample, current_sample[ 'barcode' ] )
if bc_message:
status = 'error'
message += bc_message
else:
if not sample.bar_code:
# If this is a 'new' (still in its first state) sample
# change the state to the next
if sample.state.id == request.type.states[0].id:
event = trans.model.SampleEvent( sample,
request.type.states[1],
'Sample added to the system' )
trans.sa_session.add( event )
trans.sa_session.flush()
# Now check if all the samples' barcode has been entered.
# If yes then send notification email if configured
common_state = request.samples_have_common_state
if common_state:
if common_state.id == request.type.states[1].id:
event = trans.model.RequestEvent( request,
request.states.SUBMITTED,
"All samples are in %s state." % common_state.name )
trans.sa_session.add( event )
trans.sa_session.flush()
request.send_email_notification( trans, request.type.states[1] )
sample.bar_code = current_samples[sample_index]['barcode']
trans.sa_session.add( sample )
trans.sa_session.flush()
form_values = trans.sa_session.query( trans.model.FormValues ).get( sample.values.id )
form_values.content = current_sample[ 'field_values' ]
trans.sa_session.add( form_values )
trans.sa_session.flush()
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=trans.security.encode_id( request.id ),
status=status,
message=message ) )
@web.expose
@web.require_login( "find samples" )
def find_samples( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
is_admin = cntrller == 'requests_admin' and trans.user_is_admin()
samples_list = []
results = ''
if params.get( 'find_samples_button', False ):
search_string = kwd.get( 'search_box', '' )
search_type = params.get( 'search_type', '' )
request_states = util.listify( params.get( 'request_states', '' ) )
samples = []
if search_type == 'barcode':
samples = trans.sa_session.query( trans.model.Sample ) \
.filter( and_( trans.model.Sample.table.c.deleted==False,
func.lower( trans.model.Sample.table.c.bar_code ).like( "%" + search_string.lower() + "%" ) ) ) \
.order_by( trans.model.Sample.table.c.create_time.desc() )
elif search_type == 'sample name':
samples = trans.sa_session.query( trans.model.Sample ) \
.filter( and_( trans.model.Sample.table.c.deleted==False,
func.lower( trans.model.Sample.table.c.name ).like( "%" + search_string.lower() + "%" ) ) ) \
.order_by( trans.model.Sample.table.c.create_time.desc() )
elif search_type == 'dataset':
samples = trans.sa_session.query( trans.model.Sample ) \
.filter( and_( trans.model.Sample.table.c.deleted==False,
trans.model.SampleDataset.table.c.sample_id==trans.model.Sample.table.c.id,
func.lower( trans.model.SampleDataset.table.c.name ).like( "%" + search_string.lower() + "%" ) ) ) \
.order_by( trans.model.Sample.table.c.create_time.desc() )
if is_admin:
for s in samples:
if not s.request.deleted and s.request.state in request_states:
samples_list.append( s )
else:
for s in samples:
if s.request.user.id == trans.user.id and s.request.state in request_states and not s.request.deleted:
samples_list.append( s )
results = 'There are %i samples matching the search parameters.' % len( samples_list )
# Build the request_states SelectField
selected_value = kwd.get( 'request_states', trans.model.Request.states.SUBMITTED )
states = [ v for k, v in trans.model.Request.states.items() ]
request_states = build_select_field( trans,
states,
'self',
'request_states',
selected_value=selected_value,
refresh_on_change=False,
multiple=True,
display='checkboxes' )
# Build the search_type SelectField
selected_value = kwd.get( 'search_type', 'sample name' )
types = [ 'sample name', 'barcode', 'dataset' ]
search_type = build_select_field( trans, types, 'self', 'search_type', selected_value=selected_value, refresh_on_change=False )
# Build the search_box TextField
search_box = TextField( 'search_box', 50, kwd.get('search_box', '' ) )
return trans.fill_template( '/requests/common/find_samples.mako',
cntrller=cntrller,
request_states=request_states,
samples=samples_list,
search_type=search_type,
results=results,
search_box=search_box )
@web.expose
@web.require_login( "sample events" )
def sample_events( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
status = params.get( 'status', 'done' )
message = util.restore_text( params.get( 'message', '' ) )
sample_id = params.get( 'sample_id', None )
try:
sample = trans.sa_session.query( trans.model.Sample ).get( trans.security.decode_id( sample_id ) )
except:
return invalid_id_redirect( trans, cntrller, sample_id )
events_list = []
for event in sample.events:
events_list.append( ( event.state.name,
event.state.desc,
time_ago( event.update_time ),
event.comment ) )
return trans.fill_template( '/requests/common/sample_events.mako',
cntrller=cntrller,
events_list=events_list,
sample=sample )
def __add_sample( self, trans, cntrller, request, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
managing_samples = util.string_as_bool( params.get( 'managing_samples', False ) )
is_admin = cntrller == 'requests_admin' and trans.user_is_admin()
# Get the widgets for rendering the request form
request_widgets = self.__get_request_widgets( trans, request.id )
current_samples, managing_samples, libraries = self.__get_sample_info( trans, request, **kwd )
if not current_samples:
# Form field names are zero-based.
sample_index = 0
else:
sample_index = len( current_samples )
if params.get( 'add_sample_button', False ):
num_samples_to_add = int( params.get( 'num_sample_to_copy', 1 ) )
# See if the user has selected a sample to copy.
copy_sample_index = int( params.get( 'copy_sample_index', -1 ) )
for index in range( num_samples_to_add ):
if copy_sample_index != -1:
# The user has selected a sample to copy.
library_id = current_samples[ copy_sample_index][ 'library_select_field' ].get_selected( return_value=True )
folder_id = current_samples[ copy_sample_index ][ 'folder_select_field' ].get_selected( return_value=True )
name = current_samples[ copy_sample_index ][ 'name' ] + '_%i' % ( len( current_samples ) + 1 )
field_values = [ val for val in current_samples[ copy_sample_index ][ 'field_values' ] ]
else:
# The user has not selected a sample to copy (may just be adding a sample).
library_id = None
folder_id = None
name = 'Sample_%i' % ( len( current_samples ) + 1 )
field_values = [ '' for field in request.type.sample_form.fields ]
# Build the library_select_field and folder_select_field for the new sample being added.
library_select_field, folder_select_field = self.__build_library_and_folder_select_fields( trans,
user=request.user,
sample_index=len( current_samples ),
libraries=libraries,
sample=None,
library_id=library_id,
folder_id=folder_id,
**kwd )
# Append the new sample to the current list of samples for the request
current_samples.append( dict( name=name,
barcode='',
library=None,
library_id=library_id,
folder=None,
folder_id=folder_id,
field_values=field_values,
library_select_field=library_select_field,
folder_select_field=folder_select_field ) )
selected_samples = self.__get_selected_samples( trans, request, **kwd )
selected_value = params.get( 'sample_operation', 'none' )
sample_operation_select_field = self.__build_sample_operation_select_field( trans, is_admin, request, selected_value )
sample_copy = self.__build_copy_sample_select_field( trans, current_samples )
return trans.fill_template( '/requests/common/manage_request.mako',
cntrller=cntrller,
request=request,
selected_samples=selected_samples,
request_widgets=request_widgets,
current_samples=current_samples,
sample_operation_select_field=sample_operation_select_field,
sample_copy=sample_copy,
managing_samples=managing_samples,
message=message,
status=status )
def __get_sample_info( self, trans, request, **kwd ):
"""
Retrieves all user entered sample information and returns a
list of all the samples and their field values.
"""
params = util.Params( kwd )
managing_samples = util.string_as_bool( params.get( 'managing_samples', False ) )
# Bet all data libraries accessible to this user
libraries = request.user.accessible_libraries( trans, [ trans.app.security_agent.permitted_actions.LIBRARY_ADD ] )
# Build the list of widgets which will be used to render each sample row on the request page
current_samples = []
for index, sample in enumerate( request.samples ):
library_select_field, folder_select_field = self.__build_library_and_folder_select_fields( trans,
request.user,
index,
libraries,
sample,
**kwd )
current_samples.append( dict( name=sample.name,
barcode=sample.bar_code,
library=sample.library,
folder=sample.folder,
field_values=sample.values.content,
library_select_field=library_select_field,
folder_select_field=folder_select_field ) )
if not managing_samples:
sample_index = len( request.samples )
else:
sample_index = 0
while True:
library_id = params.get( 'sample_%i_library_id' % sample_index, None )
folder_id = params.get( 'sample_%i_folder_id' % sample_index, None )
if params.get( 'sample_%i_name' % sample_index, False ):
# Data library
try:
library = trans.sa_session.query( trans.model.Library ).get( trans.security.decode_id( library_id ) )
#library_id = library.id
except:
library = None
if library is not None:
# Folder
try:
folder = trans.sa_session.query( trans.model.LibraryFolder ).get( trans.security.decode_id( folder_id ) )
#folder_id = folder.id
except:
if library:
folder = library.root_folder
else:
folder = None
else:
folder = None
sample_info = dict( name=util.restore_text( params.get( 'sample_%i_name' % sample_index, '' ) ),
barcode=util.restore_text( params.get( 'sample_%i_barcode' % sample_index, '' ) ),
library=library,
folder=folder)
sample_info[ 'field_values' ] = []
for field_index in range( len( request.type.sample_form.fields ) ):
sample_info[ 'field_values' ].append( util.restore_text( params.get( 'sample_%i_field_%i' % ( sample_index, field_index ), '' ) ) )
if not managing_samples:
sample_info[ 'library_select_field' ], sample_info[ 'folder_select_field' ] = self.__build_library_and_folder_select_fields( trans,
request.user,
sample_index,
libraries,
None,
library_id,
folder_id,
**kwd )
current_samples.append( sample_info )
else:
sample_info[ 'library_select_field' ], sample_info[ 'folder_select_field' ] = self.__build_library_and_folder_select_fields( trans,
request.user,
sample_index,
libraries,
request.samples[ sample_index ],
**kwd )
current_samples[ sample_index ] = sample_info
sample_index += 1
else:
break
return current_samples, managing_samples, libraries
@web.expose
@web.require_login( "delete sample from sequencing request" )
def delete_sample( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
status = params.get( 'status', 'done' )
message = util.restore_text( params.get( 'message', '' ) )
request_id = params.get( 'request_id', None )
try:
request = trans.sa_session.query( trans.model.Request ).get( trans.security.decode_id( request_id ) )
except:
return invalid_id_redirect( trans, cntrller, request_id )
current_samples, managing_samples, libraries = self.__get_sample_info( trans, request, **kwd )
sample_index = int( params.get( 'sample_id', 0 ) )
sample_name = current_samples[sample_index]['name']
sample = request.has_sample( sample_name )
if sample:
trans.sa_session.delete( sample.values )
trans.sa_session.delete( sample )
trans.sa_session.flush()
message = 'Sample (%s) has been deleted.' % sample_name
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=trans.security.encode_id( request.id ),
status=status,
message=message ) )
@web.expose
@web.require_login( "view data transfer page" )
def view_dataset_transfer( self, trans, cntrller, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
status = params.get( 'status', 'done' )
is_admin = cntrller == 'requests_admin' and trans.user_is_admin()
sample_id = params.get( 'sample_id', None )
try:
sample = trans.sa_session.query( trans.model.Sample ).get( trans.security.decode_id( sample_id ) )
except:
return invalid_id_redirect( trans, cntrller, sample_id )
# check if a library and folder has been set for this sample yet.
if not sample.library or not sample.folder:
status = 'error'
message = "Set a data library and folder for sequencing request (%s) to transfer datasets." % sample.name
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=trans.security.encode_id( sample.request.id ),
status=status,
message=message ) )
if is_admin:
return trans.response.send_redirect( web.url_for( controller='requests_admin',
action='manage_datasets',
sample_id=sample_id ) )
folder_path = util.restore_text( params.get( 'folder_path', '' ) )
if not folder_path:
if len( sample.datasets ):
folder_path = os.path.dirname( sample.datasets[-1].file_path[:-1] )
else:
folder_path = util.restore_text( sample.request.type.datatx_info.get( 'data_dir', '' ) )
if folder_path and folder_path[-1] != os.sep:
folder_path += os.sep
if not sample.request.type.datatx_info['host'] \
or not sample.request.type.datatx_info['username'] \
or not sample.request.type.datatx_info['password']:
status = 'error'
message = 'The sequencer login information is incomplete. Click on sequencer information to add login details.'
return trans.fill_template( '/requests/common/dataset_transfer.mako',
cntrller=cntrller,
sample=sample,
dataset_files=sample.datasets,
message=message,
status=status,
files=[],
folder_path=folder_path )
def __import_samples( self, trans, cntrller, request, current_samples, libraries, **kwd ):
"""
Reads the samples csv file and imports all the samples. The format of the csv file is:
SampleName,DataLibrary,DataLibraryFolder,Field1,Field2....
"""
params = util.Params( kwd )
managing_samples = util.string_as_bool( params.get( 'managing_samples', False ) )
file_obj = params.get( 'file_data', '' )
try:
reader = csv.reader( file_obj.file )
for row in reader:
library_id = None
folder_id = None
# FIXME: this is bad - what happens when multiple libraries have the same name??
lib = trans.sa_session.query( trans.model.Library ) \
.filter( and_( trans.model.Library.table.c.name==row[1],
trans.model.Library.table.c.deleted==False ) ) \
.first()
if lib:
folder = trans.sa_session.query( trans.model.LibraryFolder ) \
.filter( and_( trans.model.LibraryFolder.table.c.name==row[2],
trans.model.LibraryFolder.table.c.deleted==False ) ) \
.first()
if folder:
library_id = lib.id
folder_id = folder.id
library_select_field, folder_select_field = self.__build_library_and_folder_select_fields( trans,
request.user,
len( current_samples ),
libraries,
None,
library_id,
folder_id,
**kwd )
current_samples.append( dict( name=row[0],
barcode='',
library=None,
folder=None,
library_select_field=library_select_field,
folder_select_field=folder_select_field,
field_values=row[3:] ) )
except Exception, e:
status = 'error'
message = 'Error thrown when importing samples file: %s' % str( e )
return trans.response.send_redirect( web.url_for( controller='requests_common',
action='manage_request',
cntrller=cntrller,
id=trans.security.encode_id( request.id ),
status=status,
message=message ) )
request_widgets = self.__get_request_widgets( trans, request.id )
sample_copy = self.__build_copy_sample_select_field( trans, current_samples )
return trans.fill_template( '/requests/common/manage_request.mako',
cntrller=cntrller,
request=request,
request_widgets=request_widgets,
current_samples=current_samples,
sample_copy=sample_copy,
managing_samples=managing_samples )
# ===== Methods for handling form definition widgets =====
def __get_request_widgets( self, trans, id ):
"""Get the widgets for the request"""
request = trans.sa_session.query( trans.model.Request ).get( id )
# The request_widgets list is a list of dictionaries
request_widgets = []
for index, field in enumerate( request.type.request_form.fields ):
if field[ 'required' ]:
required_label = 'Required'
else:
required_label = 'Optional'
if field[ 'type' ] == 'AddressField':
if request.values.content[ index ]:
request_widgets.append( dict( label=field[ 'label' ],
value=trans.sa_session.query( trans.model.UserAddress ).get( int( request.values.content[ index ] ) ).get_html(),
helptext=field[ 'helptext' ] + ' (' + required_label + ')' ) )
else:
request_widgets.append( dict( label=field[ 'label' ],
value=None,
helptext=field[ 'helptext' ] + ' (' + required_label + ')' ) )
else:
request_widgets.append( dict( label=field[ 'label' ],
value=request.values.content[ index ],
helptext=field[ 'helptext' ] + ' (' + required_label + ')' ) )
return request_widgets
def __get_samples_widgets( self, trans, request, libraries, **kwd ):
"""Get the widgets for all of the samples currently associated with the request"""
# The current_samples_widgets list is a list of dictionaries
current_samples_widgets = []
for index, sample in enumerate( request.samples ):
# Build the library_select_field and folder_select_field for each existing sample
library_select_field, folder_select_field = self.__build_library_and_folder_select_fields( trans,
user=request.user,
sample_index=index,
libraries=libraries,
sample=sample,
**kwd )
# Append the dictionary for the current sample to the current_samples_widgets list
current_samples_widgets.append( dict( name=sample.name,
barcode=sample.bar_code,
library=sample.library,
folder=sample.folder,
field_values=sample.values.content,
library_select_field=library_select_field,
folder_select_field=folder_select_field ) )
return current_samples_widgets
# ===== Methods for building SelectFields used on various request forms =====
def __build_copy_sample_select_field( self, trans, current_samples ):
copy_sample_index_select_field = SelectField( 'copy_sample_index' )
copy_sample_index_select_field.add_option( 'None', -1, selected=True )
for index, sample_dict in enumerate( current_samples ):
copy_sample_index_select_field.add_option( sample_dict[ 'name' ], index )
return copy_sample_index_select_field
def __build_request_type_id_select_field( self, trans, selected_value='none' ):
accessible_request_types = trans.user.accessible_request_types( trans )
return build_select_field( trans, accessible_request_types, 'name', 'request_type_id', selected_value=selected_value, refresh_on_change=True )
def __build_user_id_select_field( self, trans, selected_value='none' ):
active_users = trans.sa_session.query( trans.model.User ) \
.filter( trans.model.User.table.c.deleted == False ) \
.order_by( trans.model.User.email.asc() )
# A refresh_on_change is required so the user's set of addresses can be displayed.
return build_select_field( trans, active_users, 'email', 'user_id', selected_value=selected_value, refresh_on_change=True )
def __build_sample_operation_select_field( self, trans, is_admin, request, selected_value ):
# The sample_operation SelectField is displayed only after the request has been submitted.
# It's label is "For selected samples"
if is_admin:
if request.is_complete:
bulk_operations = [ trans.model.Sample.bulk_operations.CHANGE_STATE ]
if request.is_rejected:
bulk_operations = [ trans.model.Sample.bulk_operations.SELECT_LIBRARY ]
else:
bulk_operations = [ s for i, s in trans.model.Sample.bulk_operations.items() ]
else:
if request.is_complete:
bulk_operations = []
else:
bulk_operations = [ trans.model.Sample.bulk_operations.SELECT_LIBRARY ]
return build_select_field( trans, bulk_operations, 'self', 'sample_operation', selected_value=selected_value, refresh_on_change=True )
def __build_library_and_folder_select_fields( self, trans, user, sample_index, libraries, sample=None, library_id=None, folder_id=None, **kwd ):
# Create the library_id SelectField for a specific sample. The received libraries param is a list of all the libraries
# accessible to the current user, and we add them as options to the library_select_field. If the user has selected an
# existing library then display all the accessible folders of the selected library in the folder_select_field.
#
# The libraries dictionary looks like: { library : '1,2' }, library : '3' }. Its keys are the libraries that
# should be displayed for the current user and its values are strings of comma-separated folder ids that should
# NOT be displayed.
#
# TODO: all object ids received in the params must be encoded.
params = util.Params( kwd )
library_select_field_name= "sample_%i_library_id" % sample_index
folder_select_field_name = "sample_%i_folder_id" % sample_index
if not library_id:
library_id = params.get( library_select_field_name, 'none' )
selected_library = None
selected_hidden_folder_ids = []
showable_folders = []
if sample and sample.library and library_id == 'none':
library_id = str( sample.library.id )
selected_library = sample.library
# If we have a selected library, get the list of it's folders that are not accessible to the current user
for library, hidden_folder_ids in libraries.items():
encoded_id = trans.security.encode_id( library.id )
if encoded_id == str( library_id ):
selected_library = library
selected_hidden_folder_ids = hidden_folder_ids.split( ',' )
break
# sample_%i_library_id SelectField with refresh on change enabled
library_select_field = build_select_field( trans,
libraries.keys(),
'name',
library_select_field_name,
initial_value='none',
selected_value=str( library_id ).lower(),
refresh_on_change=True )
# Get all accessible folders for the selected library, if one is indeed selected
if selected_library:
showable_folders = trans.app.security_agent.get_showable_folders( user,
user.all_roles(),
selected_library,
[ trans.app.security_agent.permitted_actions.LIBRARY_ADD ],
selected_hidden_folder_ids )
if sample:
# The user is editing the request, and may have previously selected a folder
if sample.folder:
selected_folder_id = sample.folder.id
else:
# If a library is selected but not a folder, use the library's root folder
if sample.library:
selected_folder_id = sample.library.root_folder.id
else:
# The user just selected a folder
selected_folder_id = params.get( folder_select_field_name, 'none' )
elif folder_id:
# TODO: not sure when this would be passed
selected_folder_id = folder_id
else:
selected_folder_id = 'none'
# TODO: Change the name of the library root folder to "Library root" to clarify to the
# user that it is the root folder. We probably should just change this in the Library code,
# and update the data in the db.
folder_select_field = build_select_field( trans,
showable_folders,
'name',
folder_select_field_name,
initial_value='none',
selected_value=selected_folder_id )
return library_select_field, folder_select_field
def __build_sample_state_id_select_field( self, trans, request, selected_value ):
if selected_value == 'none':
if request.samples:
selected_value = trans.security.encode_id( request.samples[0].state.id )
else:
selected_value = trans.security.encode_id( request.type.states[0].id )
return build_select_field( trans,
objs=request.type.states,
label_attr='name',
select_field_name='sample_state_id',
selected_value=selected_value,
refresh_on_change=False )
# ===== Methods for validation forms and fields =====
def __validate_request( self, trans, cntrller, request ):
"""Validates the request entered by the user"""
# TODO: Add checks for required sample fields here.
empty_fields = []
# Make sure required form fields are filled in.
for index, field in enumerate( request.type.request_form.fields ):
if field[ 'required' ] == 'required' and request.values.content[ index ] in [ '', None ]:
empty_fields.append( field[ 'label' ] )
if empty_fields:
message = 'Complete the following fields of the request before submitting: '
for ef in empty_fields:
message += '' + ef + ' '
return message
return None
def __validate_barcode( self, trans, sample, barcode ):
"""
Makes sure that the barcode about to be assigned to a sample is gobally unique.
That is, barcodes must be unique across requests in Galaxy sample tracking.
"""
message = ''
unique = True
for index in range( len( sample.request.samples ) ):
# Check for empty bar code
if not barcode.strip():
message = 'Fill in the barcode for sample (%s).' % sample.name
break
# TODO: Add a unique constraint to sample.bar_code table column
# Make sure bar code is unique
for sample_has_bar_code in trans.sa_session.query( trans.model.Sample ) \
.filter( trans.model.Sample.table.c.bar_code == barcode ):
if sample_has_bar_code and sample_has_bar_code.id != sample.id:
message = '''The bar code (%s) associated with the sample (%s) belongs to another sample.
Bar codes must be unique across all samples, so use a different bar code
for this sample.''' % ( barcode, sample.name )
unique = False
break
if not unique:
break
return message
def __validate_email( self, email ):
error = ''
if len( email ) == 0 or "@" not in email or "." not in email:
error = "(%s) is not a valid email address. " % str( email )
elif len( email ) > 255:
error = "(%s) exceeds maximum allowable length. " % str( email )
return error
# ===== Other miscellaneoud utility methods =====
def __get_selected_samples( self, trans, request, **kwd ):
selected_samples = []
for sample in request.samples:
if CheckboxField.is_checked( kwd.get( 'select_sample_%i' % sample.id, '' ) ):
selected_samples.append( trans.security.encode_id( sample.id ) )
return selected_samples
# ===== Miscellaneoud utility methods outside of the RequestsCommon class =====
def invalid_id_redirect( trans, cntrller, obj_id, action='browse_requests' ):
status = 'error'
message = "Invalid request id (%s)" % str( obj_id )
return trans.response.send_redirect( web.url_for( controller=cntrller,
action=action,
status=status,
message=message ) )