1 | #/usr/bin/python |
---|
2 | |
---|
3 | from datetime import datetime |
---|
4 | import sys |
---|
5 | import optparse |
---|
6 | import os |
---|
7 | import time |
---|
8 | import logging |
---|
9 | |
---|
10 | assert sys.version_info[:2] >= ( 2, 4 ) |
---|
11 | new_path = [ os.path.join( os.getcwd(), "lib" ) ] |
---|
12 | new_path.extend( sys.path[1:] ) # remove scripts/ from the path |
---|
13 | sys.path = new_path |
---|
14 | from galaxy import eggs |
---|
15 | from galaxy.model.custom_types import * |
---|
16 | import pkg_resources |
---|
17 | pkg_resources.require( "psycopg2" ) |
---|
18 | import psycopg2 |
---|
19 | pkg_resources.require( "SQLAlchemy >= 0.4" ) |
---|
20 | from sqlalchemy import * |
---|
21 | from sqlalchemy.orm import sessionmaker |
---|
22 | |
---|
23 | #logging.basicConfig(level=logging.DEBUG) |
---|
24 | #log = logging.getLogger( 'GalaxyDbInterface' ) |
---|
25 | |
---|
26 | class GalaxyDbInterface(object): |
---|
27 | |
---|
28 | def __init__(self, dbstr): |
---|
29 | self.dbstr = dbstr |
---|
30 | self.db_engine = create_engine(self.dbstr) |
---|
31 | self.db_engine.echo = True |
---|
32 | self.metadata = MetaData(self.db_engine) |
---|
33 | self.session = sessionmaker(bind=self.db_engine) |
---|
34 | self.event_table = Table('sample_event', self.metadata, autoload=True ) |
---|
35 | self.sample_table = Table('sample', self.metadata, autoload=True ) |
---|
36 | self.sample_dataset_table = Table('sample_dataset', self.metadata, autoload=True ) |
---|
37 | self.request_table = Table('request', self.metadata, autoload=True ) |
---|
38 | self.request_event_table = Table('request_event', self.metadata, autoload=True ) |
---|
39 | self.state_table = Table('sample_state', self.metadata, autoload=True ) |
---|
40 | |
---|
41 | def get_sample_id(self, field_name='bar_code', value=None): |
---|
42 | if not value: |
---|
43 | return -1 |
---|
44 | sample_id = -1 |
---|
45 | if field_name =='name': |
---|
46 | stmt = select(columns=[self.sample_table.c.id], |
---|
47 | whereclause=self.sample_table.c.name==value) |
---|
48 | result = stmt.execute() |
---|
49 | sample_id = result.fetchone()[0] |
---|
50 | elif field_name == 'bar_code': |
---|
51 | stmt = select(columns=[self.sample_table.c.id], |
---|
52 | whereclause=self.sample_table.c.bar_code==value) |
---|
53 | result = stmt.execute() |
---|
54 | x = result.fetchone() |
---|
55 | if x: |
---|
56 | sample_id = x[0] |
---|
57 | #log.debug('Sample ID: %i' % sample_id) |
---|
58 | return sample_id |
---|
59 | return -1 |
---|
60 | |
---|
61 | def get_request_id(self, sample_id): |
---|
62 | query = select(columns=[self.sample_table.c.request_id], |
---|
63 | whereclause=self.sample_table.c.id==sample_id) |
---|
64 | request_id = query.execute().fetchall()[0][0] |
---|
65 | return request_id |
---|
66 | |
---|
67 | def current_state(self, sample_id): |
---|
68 | ''' |
---|
69 | This method returns the current state of the sample for the given sample_id |
---|
70 | ''' |
---|
71 | stmt = select(columns=[self.event_table.c.sample_state_id], |
---|
72 | whereclause=self.event_table.c.sample_id==sample_id, |
---|
73 | order_by=self.event_table.c.update_time.desc()) |
---|
74 | result = stmt.execute() |
---|
75 | all_states = result.fetchall() |
---|
76 | current_state_id = all_states[0][0] |
---|
77 | return current_state_id |
---|
78 | |
---|
79 | def all_possible_states(self, sample_id): |
---|
80 | subsubquery = select(columns=[self.sample_table.c.request_id], |
---|
81 | whereclause=self.sample_table.c.id==sample_id) |
---|
82 | self.request_id = subsubquery.execute().fetchall()[0][0] |
---|
83 | #log.debug('REQUESTID: %i' % self.request_id) |
---|
84 | subquery = select(columns=[self.request_table.c.request_type_id], |
---|
85 | whereclause=self.request_table.c.id==self.request_id) |
---|
86 | request_type_id = subquery.execute().fetchall()[0][0] |
---|
87 | #log.debug('REQUESTTYPEID: %i' % request_type_id) |
---|
88 | query = select(columns=[self.state_table.c.id, self.state_table.c.name], |
---|
89 | whereclause=self.state_table.c.request_type_id==request_type_id, |
---|
90 | order_by=self.state_table.c.id.asc()) |
---|
91 | states = query.execute().fetchall() |
---|
92 | #log.debug('POSSIBLESTATES: '+ str(states)) |
---|
93 | return states |
---|
94 | |
---|
95 | def change_state(self, sample_id, new_state=None): |
---|
96 | ''' |
---|
97 | This method changes the state of the sample to the the 'new_state' |
---|
98 | ''' |
---|
99 | if not new_state: |
---|
100 | return |
---|
101 | new_state_id = -1 |
---|
102 | # find the state_id for this new state in the list of possible states |
---|
103 | possible_states = self.all_possible_states(sample_id) |
---|
104 | for state_id, state_name in possible_states: |
---|
105 | if new_state == state_name: |
---|
106 | new_state_id = state_id |
---|
107 | if new_state_id == -1: |
---|
108 | return |
---|
109 | #log.debug('Updating sample_id %i state to %s' % (sample_id, new_state)) |
---|
110 | i = self.event_table.insert() |
---|
111 | i.execute(update_time=datetime.utcnow(), |
---|
112 | create_time=datetime.utcnow(), |
---|
113 | sample_id=sample_id, |
---|
114 | sample_state_id=int(new_state_id), |
---|
115 | comment='Update by barcode scan') |
---|
116 | # if all the samples for this request are in the final state |
---|
117 | # then change the request state to 'Complete' |
---|
118 | result = select(columns=[self.sample_table.c.id], |
---|
119 | whereclause=self.sample_table.c.request_id==self.request_id).execute() |
---|
120 | sample_id_list = result.fetchall() |
---|
121 | request_complete = True |
---|
122 | for sid in sample_id_list: |
---|
123 | current_state_id = self.current_state(sid[0]) |
---|
124 | if current_state_id != possible_states[-1][0]: |
---|
125 | request_complete = False |
---|
126 | break |
---|
127 | if request_complete: |
---|
128 | request_state = 'Complete' |
---|
129 | #log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state)) |
---|
130 | i = self.request_event_table.insert() |
---|
131 | i.execute(update_time=datetime.utcnow(), |
---|
132 | create_time=datetime.utcnow(), |
---|
133 | request_id=self.request_id, |
---|
134 | state=request_state, |
---|
135 | comment='All samples of this request have finished processing.') |
---|
136 | |
---|
137 | def set_sample_dataset_status(self, id, new_status, msg=None): |
---|
138 | u = self.sample_dataset_table.update(whereclause=self.sample_dataset_table.c.id==int(id)) |
---|
139 | u.execute(status=new_status) |
---|
140 | if new_status == 'Error': |
---|
141 | u.execute(error_msg=msg) |
---|
142 | else: |
---|
143 | u.execute(error_msg='') |
---|
144 | return |
---|
145 | |
---|
146 | |
---|
147 | |
---|
148 | if __name__ == '__main__': |
---|
149 | print '''This file should not be run directly. To start the Galaxy AMQP Listener: |
---|
150 | %sh run_galaxy_listener.sh''' |
---|
151 | dbstr = 'postgres://postgres:postgres@localhost/g2' |
---|
152 | |
---|
153 | parser = optparse.OptionParser() |
---|
154 | parser.add_option('-n', '--name', help='name of the sample field', dest='name', \ |
---|
155 | action='store', default='bar_code') |
---|
156 | parser.add_option('-v', '--value', help='value of the sample field', dest='value', \ |
---|
157 | action='store') |
---|
158 | parser.add_option('-s', '--state', help='new state of the sample', dest='state', \ |
---|
159 | action='store') |
---|
160 | (opts, args) = parser.parse_args() |
---|
161 | |
---|
162 | gs = GalaxyDbInterface(dbstr) |
---|
163 | sample_id = gs.get_sample_id(field_name=opts.name, value=opts.value) |
---|
164 | gs.change_state(sample_id, opts.state) |
---|
165 | |
---|
166 | |
---|
167 | |
---|