[2] | 1 | #/usr/bin/python |
---|
| 2 | |
---|
| 3 | from datetime import datetime |
---|
| 4 | import sys |
---|
| 5 | import optparse |
---|
| 6 | import os |
---|
| 7 | import time |
---|
| 8 | import logging |
---|
| 9 | |
---|
| 10 | assert sys.version_info[:2] >= ( 2, 4 ) |
---|
| 11 | new_path = [ os.path.join( os.getcwd(), "lib" ) ] |
---|
| 12 | new_path.extend( sys.path[1:] ) # remove scripts/ from the path |
---|
| 13 | sys.path = new_path |
---|
| 14 | from galaxy import eggs |
---|
| 15 | from galaxy.model.custom_types import * |
---|
| 16 | import pkg_resources |
---|
| 17 | pkg_resources.require( "psycopg2" ) |
---|
| 18 | import psycopg2 |
---|
| 19 | pkg_resources.require( "SQLAlchemy >= 0.4" ) |
---|
| 20 | from sqlalchemy import * |
---|
| 21 | from sqlalchemy.orm import sessionmaker |
---|
| 22 | |
---|
| 23 | #logging.basicConfig(level=logging.DEBUG) |
---|
| 24 | #log = logging.getLogger( 'GalaxyDbInterface' ) |
---|
| 25 | |
---|
| 26 | class GalaxyDbInterface(object): |
---|
| 27 | |
---|
| 28 | def __init__(self, dbstr): |
---|
| 29 | self.dbstr = dbstr |
---|
| 30 | self.db_engine = create_engine(self.dbstr) |
---|
| 31 | self.db_engine.echo = True |
---|
| 32 | self.metadata = MetaData(self.db_engine) |
---|
| 33 | self.session = sessionmaker(bind=self.db_engine) |
---|
| 34 | self.event_table = Table('sample_event', self.metadata, autoload=True ) |
---|
| 35 | self.sample_table = Table('sample', self.metadata, autoload=True ) |
---|
| 36 | self.sample_dataset_table = Table('sample_dataset', self.metadata, autoload=True ) |
---|
| 37 | self.request_table = Table('request', self.metadata, autoload=True ) |
---|
| 38 | self.request_event_table = Table('request_event', self.metadata, autoload=True ) |
---|
| 39 | self.state_table = Table('sample_state', self.metadata, autoload=True ) |
---|
| 40 | |
---|
| 41 | def get_sample_id(self, field_name='bar_code', value=None): |
---|
| 42 | if not value: |
---|
| 43 | return -1 |
---|
| 44 | sample_id = -1 |
---|
| 45 | if field_name =='name': |
---|
| 46 | stmt = select(columns=[self.sample_table.c.id], |
---|
| 47 | whereclause=self.sample_table.c.name==value) |
---|
| 48 | result = stmt.execute() |
---|
| 49 | sample_id = result.fetchone()[0] |
---|
| 50 | elif field_name == 'bar_code': |
---|
| 51 | stmt = select(columns=[self.sample_table.c.id], |
---|
| 52 | whereclause=self.sample_table.c.bar_code==value) |
---|
| 53 | result = stmt.execute() |
---|
| 54 | x = result.fetchone() |
---|
| 55 | if x: |
---|
| 56 | sample_id = x[0] |
---|
| 57 | #log.debug('Sample ID: %i' % sample_id) |
---|
| 58 | return sample_id |
---|
| 59 | return -1 |
---|
| 60 | |
---|
| 61 | def get_request_id(self, sample_id): |
---|
| 62 | query = select(columns=[self.sample_table.c.request_id], |
---|
| 63 | whereclause=self.sample_table.c.id==sample_id) |
---|
| 64 | request_id = query.execute().fetchall()[0][0] |
---|
| 65 | return request_id |
---|
| 66 | |
---|
| 67 | def current_state(self, sample_id): |
---|
| 68 | ''' |
---|
| 69 | This method returns the current state of the sample for the given sample_id |
---|
| 70 | ''' |
---|
| 71 | stmt = select(columns=[self.event_table.c.sample_state_id], |
---|
| 72 | whereclause=self.event_table.c.sample_id==sample_id, |
---|
| 73 | order_by=self.event_table.c.update_time.desc()) |
---|
| 74 | result = stmt.execute() |
---|
| 75 | all_states = result.fetchall() |
---|
| 76 | current_state_id = all_states[0][0] |
---|
| 77 | return current_state_id |
---|
| 78 | |
---|
| 79 | def all_possible_states(self, sample_id): |
---|
| 80 | subsubquery = select(columns=[self.sample_table.c.request_id], |
---|
| 81 | whereclause=self.sample_table.c.id==sample_id) |
---|
| 82 | self.request_id = subsubquery.execute().fetchall()[0][0] |
---|
| 83 | #log.debug('REQUESTID: %i' % self.request_id) |
---|
| 84 | subquery = select(columns=[self.request_table.c.request_type_id], |
---|
| 85 | whereclause=self.request_table.c.id==self.request_id) |
---|
| 86 | request_type_id = subquery.execute().fetchall()[0][0] |
---|
| 87 | #log.debug('REQUESTTYPEID: %i' % request_type_id) |
---|
| 88 | query = select(columns=[self.state_table.c.id, self.state_table.c.name], |
---|
| 89 | whereclause=self.state_table.c.request_type_id==request_type_id, |
---|
| 90 | order_by=self.state_table.c.id.asc()) |
---|
| 91 | states = query.execute().fetchall() |
---|
| 92 | #log.debug('POSSIBLESTATES: '+ str(states)) |
---|
| 93 | return states |
---|
| 94 | |
---|
| 95 | def change_state(self, sample_id, new_state=None): |
---|
| 96 | ''' |
---|
| 97 | This method changes the state of the sample to the the 'new_state' |
---|
| 98 | ''' |
---|
| 99 | if not new_state: |
---|
| 100 | return |
---|
| 101 | new_state_id = -1 |
---|
| 102 | # find the state_id for this new state in the list of possible states |
---|
| 103 | possible_states = self.all_possible_states(sample_id) |
---|
| 104 | for state_id, state_name in possible_states: |
---|
| 105 | if new_state == state_name: |
---|
| 106 | new_state_id = state_id |
---|
| 107 | if new_state_id == -1: |
---|
| 108 | return |
---|
| 109 | #log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) |
---|
| 110 | i = self.event_table.insert() |
---|
| 111 | i.execute(update_time=datetime.utcnow(), |
---|
| 112 | create_time=datetime.utcnow(), |
---|
| 113 | sample_id=sample_id, |
---|
| 114 | sample_state_id=int(new_state_id), |
---|
| 115 | comment='Update by barcode scan') |
---|
| 116 | # if all the samples for this request are in the final state |
---|
| 117 | # then change the request state to 'Complete' |
---|
| 118 | result = select(columns=[self.sample_table.c.id], |
---|
| 119 | whereclause=self.sample_table.c.request_id==self.request_id).execute() |
---|
| 120 | sample_id_list = result.fetchall() |
---|
| 121 | request_complete = True |
---|
| 122 | for sid in sample_id_list: |
---|
| 123 | current_state_id = self.current_state(sid[0]) |
---|
| 124 | if current_state_id != possible_states[-1][0]: |
---|
| 125 | request_complete = False |
---|
| 126 | break |
---|
| 127 | if request_complete: |
---|
| 128 | request_state = 'Complete' |
---|
| 129 | #log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) |
---|
| 130 | i = self.request_event_table.insert() |
---|
| 131 | i.execute(update_time=datetime.utcnow(), |
---|
| 132 | create_time=datetime.utcnow(), |
---|
| 133 | request_id=self.request_id, |
---|
| 134 | state=request_state, |
---|
| 135 | comment='All samples of this request have finished processing.') |
---|
| 136 | |
---|
| 137 | def set_sample_dataset_status(self, id, new_status, msg=None): |
---|
| 138 | u = self.sample_dataset_table.update(whereclause=self.sample_dataset_table.c.id==int(id)) |
---|
| 139 | u.execute(status=new_status) |
---|
| 140 | if new_status == 'Error': |
---|
| 141 | u.execute(error_msg=msg) |
---|
| 142 | else: |
---|
| 143 | u.execute(error_msg='') |
---|
| 144 | return |
---|
| 145 | |
---|
| 146 | |
---|
| 147 | |
---|
| 148 | if __name__ == '__main__': |
---|
| 149 | print '''This file should not be run directly. To start the Galaxy AMQP Listener: |
---|
| 150 | %sh run_galaxy_listener.sh''' |
---|
| 151 | dbstr = 'postgres://postgres:postgres@localhost/g2' |
---|
| 152 | |
---|
| 153 | parser = optparse.OptionParser() |
---|
| 154 | parser.add_option('-n', '--name', help='name of the sample field', dest='name', \ |
---|
| 155 | action='store', default='bar_code') |
---|
| 156 | parser.add_option('-v', '--value', help='value of the sample field', dest='value', \ |
---|
| 157 | action='store') |
---|
| 158 | parser.add_option('-s', '--state', help='new state of the sample', dest='state', \ |
---|
| 159 | action='store') |
---|
| 160 | (opts, args) = parser.parse_args() |
---|
| 161 | |
---|
| 162 | gs = GalaxyDbInterface(dbstr) |
---|
| 163 | sample_id = gs.get_sample_id(field_name=opts.name, value=opts.value) |
---|
| 164 | gs.change_state(sample_id, opts.state) |
---|
| 165 | |
---|
| 166 | |
---|
| 167 | |
---|