root/galaxy-central/scripts/galaxy_messaging/server/galaxydb_interface.py @ 2

リビジョン 2, 6.9 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

Rev行番号 
[2]1#/usr/bin/python
2
3from datetime import datetime
4import sys
5import optparse
6import os
7import time
8import logging
9
10assert sys.version_info[:2] >= ( 2, 4 )
11new_path = [ os.path.join( os.getcwd(), "lib" ) ]
12new_path.extend( sys.path[1:] ) # remove scripts/ from the path
13sys.path = new_path
14from galaxy import eggs
15from galaxy.model.custom_types import *
16import pkg_resources
17pkg_resources.require( "psycopg2" )
18import psycopg2
19pkg_resources.require( "SQLAlchemy >= 0.4" )
20from sqlalchemy import *
21from sqlalchemy.orm import sessionmaker
22
23#logging.basicConfig(level=logging.DEBUG)
24#log = logging.getLogger( 'GalaxyDbInterface' )
25
26class GalaxyDbInterface(object):
27   
28    def __init__(self, dbstr):
29        self.dbstr = dbstr
30        self.db_engine = create_engine(self.dbstr)   
31        self.db_engine.echo = True 
32        self.metadata = MetaData(self.db_engine)
33        self.session = sessionmaker(bind=self.db_engine)
34        self.event_table = Table('sample_event', self.metadata, autoload=True )
35        self.sample_table = Table('sample', self.metadata, autoload=True )
36        self.sample_dataset_table = Table('sample_dataset', self.metadata, autoload=True )
37        self.request_table = Table('request', self.metadata, autoload=True )
38        self.request_event_table = Table('request_event', self.metadata, autoload=True )
39        self.state_table = Table('sample_state', self.metadata, autoload=True  )
40
41    def get_sample_id(self, field_name='bar_code', value=None):
42        if not value:
43            return -1
44        sample_id = -1
45        if field_name =='name':
46            stmt = select(columns=[self.sample_table.c.id],
47                          whereclause=self.sample_table.c.name==value)
48            result = stmt.execute()
49            sample_id = result.fetchone()[0]
50        elif field_name == 'bar_code':
51            stmt = select(columns=[self.sample_table.c.id],
52                          whereclause=self.sample_table.c.bar_code==value)
53            result = stmt.execute()
54            x = result.fetchone()
55            if x:
56                sample_id = x[0]
57                #log.debug('Sample ID: %i' % sample_id)
58                return sample_id
59        return -1
60   
61    def get_request_id(self, sample_id):
62        query = select(columns=[self.sample_table.c.request_id],
63                       whereclause=self.sample_table.c.id==sample_id)
64        request_id = query.execute().fetchall()[0][0]
65        return request_id
66       
67    def current_state(self, sample_id):
68        '''
69        This method returns the current state of the sample for the given sample_id
70        '''
71        stmt = select(columns=[self.event_table.c.sample_state_id],
72                      whereclause=self.event_table.c.sample_id==sample_id,
73                      order_by=self.event_table.c.update_time.desc())
74        result = stmt.execute()
75        all_states = result.fetchall()
76        current_state_id = all_states[0][0]
77        return current_state_id
78       
79    def all_possible_states(self, sample_id):
80        subsubquery = select(columns=[self.sample_table.c.request_id],
81                             whereclause=self.sample_table.c.id==sample_id)
82        self.request_id = subsubquery.execute().fetchall()[0][0]
83        #log.debug('REQUESTID: %i' % self.request_id)
84        subquery = select(columns=[self.request_table.c.request_type_id],
85                          whereclause=self.request_table.c.id==self.request_id)
86        request_type_id = subquery.execute().fetchall()[0][0]
87        #log.debug('REQUESTTYPEID: %i' % request_type_id)
88        query = select(columns=[self.state_table.c.id, self.state_table.c.name],
89                       whereclause=self.state_table.c.request_type_id==request_type_id,
90                       order_by=self.state_table.c.id.asc())
91        states = query.execute().fetchall()
92        #log.debug('POSSIBLESTATES: '+ str(states))
93        return states
94   
95    def change_state(self, sample_id, new_state=None):
96        '''
97        This method changes the state of the sample to the the 'new_state'
98        '''
99        if not new_state:
100            return
101        new_state_id = -1
102        # find the state_id for this new state in the list of possible states
103        possible_states = self.all_possible_states(sample_id)
104        for state_id, state_name in possible_states:
105            if new_state == state_name:
106                new_state_id = state_id
107        if new_state_id == -1:
108            return
109        #log.debug('Updating sample_id %i state to %s' % (sample_id, new_state))
110        i = self.event_table.insert()
111        i.execute(update_time=datetime.utcnow(),
112                  create_time=datetime.utcnow(),
113                  sample_id=sample_id,
114                  sample_state_id=int(new_state_id),
115                  comment='Update by barcode scan')
116        # if all the samples for this request are in the final state
117        # then change the request state to 'Complete'
118        result = select(columns=[self.sample_table.c.id],
119                        whereclause=self.sample_table.c.request_id==self.request_id).execute()
120        sample_id_list = result.fetchall()
121        request_complete = True
122        for sid in sample_id_list:
123            current_state_id = self.current_state(sid[0])
124            if current_state_id != possible_states[-1][0]:
125                request_complete = False
126                break
127        if request_complete:
128            request_state = 'Complete'
129            #log.debug('Updating request_id %i state to "%s"' % (self.request_id, request_state))
130            i = self.request_event_table.insert()
131            i.execute(update_time=datetime.utcnow(),
132                      create_time=datetime.utcnow(),
133                      request_id=self.request_id,
134                      state=request_state,
135                      comment='All samples of this request have finished processing.')
136
137    def set_sample_dataset_status(self, id, new_status, msg=None):
138        u = self.sample_dataset_table.update(whereclause=self.sample_dataset_table.c.id==int(id))
139        u.execute(status=new_status)
140        if new_status == 'Error':
141            u.execute(error_msg=msg)
142        else:
143            u.execute(error_msg='')
144        return
145   
146
147
148if __name__ == '__main__':
149    print '''This file should not be run directly. To start the Galaxy AMQP Listener:
150    %sh run_galaxy_listener.sh'''
151    dbstr = 'postgres://postgres:postgres@localhost/g2'
152
153    parser = optparse.OptionParser()
154    parser.add_option('-n', '--name', help='name of the sample field', dest='name', \
155                      action='store', default='bar_code')
156    parser.add_option('-v', '--value', help='value of the sample field', dest='value', \
157                      action='store')
158    parser.add_option('-s', '--state', help='new state of the sample', dest='state', \
159                      action='store')
160    (opts, args) = parser.parse_args()
161
162    gs = GalaxyDbInterface(dbstr)
163    sample_id = gs.get_sample_id(field_name=opts.name, value=opts.value)
164    gs.change_state(sample_id, opts.state)
165
166           
167
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。