root/galaxy-central/scripts/galaxy_messaging/server/data_transfer.py

リビジョン 2, 8.9 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

  • 属性 svn:executable の設定値 *
行番号 
1#!/usr/bin/env python
2"""
3
4Data Transfer Script: Sequencer to Galaxy
5
6This script is called from Galaxy LIMS once the lab admin starts the data
7transfer process using the user interface.
8
9Usage:
10
11python data_transfer.py <data_transfer_xml> <config_id_secret>
12
13
14"""
15import ConfigParser
16import sys, os, time, traceback
17import optparse
18import urllib,urllib2, cookielib, shutil
19import logging, time, datetime
20import xml.dom.minidom
21
22sp = sys.path[0]
23
24from galaxydb_interface import GalaxyDbInterface
25
26assert sys.version_info[:2] >= ( 2, 4 )
27new_path = [ sp ]
28new_path.extend( sys.path )
29sys.path = new_path
30
31from galaxyweb_interface import GalaxyWebInterface
32
33assert sys.version_info[:2] >= ( 2, 4 )
34new_path = [ os.path.join( os.getcwd(), "lib" ) ]
35new_path.extend( sys.path[1:] ) # remove scripts/ from the path
36sys.path = new_path
37
38
39from galaxy.util.json import from_json_string, to_json_string
40from galaxy.model import Sample
41from galaxy import eggs
42import pkg_resources
43pkg_resources.require( "pexpect" )
44import pexpect
45
46pkg_resources.require( "simplejson" )
47import simplejson
48
49log = logging.getLogger("datatx_"+str(os.getpid()))
50log.setLevel(logging.DEBUG)
51fh = logging.FileHandler("data_transfer.log")
52fh.setLevel(logging.DEBUG)
53formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s")
54fh.setFormatter(formatter)
55log.addHandler(fh)
56
57
58class DataTransfer(object):
59   
60    def __init__(self, msg, config_id_secret):
61        log.info(msg)
62        self.dom = xml.dom.minidom.parseString(msg)
63        self.host = self.get_value(self.dom, 'data_host')
64        self.username = self.get_value(self.dom, 'data_user')
65        self.password = self.get_value(self.dom, 'data_password')
66        self.sample_id = self.get_value(self.dom, 'sample_id')
67        self.library_id = self.get_value(self.dom, 'library_id')
68        self.folder_id = self.get_value(self.dom, 'folder_id')
69        self.dataset_files = []
70        self.config_id_secret = config_id_secret
71        count=0
72        while True:
73           dataset_id = self.get_value_index(self.dom, 'dataset_id', count)
74           file = self.get_value_index(self.dom, 'file', count)
75           name = self.get_value_index(self.dom, 'name', count)
76           if file:
77               self.dataset_files.append(dict(name=name,
78                                              dataset_id=int(dataset_id),
79                                              file=file))
80           else:
81               break
82           count=count+1
83        try:
84            # Retrieve the upload user login information from the config file
85            config = ConfigParser.ConfigParser()
86            config.read('transfer_datasets.ini')
87            self.datatx_email = config.get("data_transfer_user_login_info", "email")
88            self.datatx_password = config.get("data_transfer_user_login_info", "password")
89            self.server_host = config.get("universe_wsgi_config", "host")
90            self.server_port = config.get("universe_wsgi_config", "port")
91            self.database_connection = config.get("universe_wsgi_config", "database_connection")
92            self.import_dir = config.get("universe_wsgi_config", "library_import_dir")
93            # create the destination directory within the import directory
94            self.server_dir = os.path.join( self.import_dir, 'datatx_'+str(os.getpid())+'_'+datetime.date.today().strftime("%d%b%Y") )
95            os.mkdir(self.server_dir)
96            if not os.path.exists(self.server_dir):
97                raise Exception
98            # connect to db
99            self.galaxydb = GalaxyDbInterface(self.database_connection)
100        except:
101            log.error(traceback.format_exc())
102            log.error('FATAL ERROR')
103            if self.database_connection:
104                self.error_and_exit('Error')
105            sys.exit(1)
106     
107    def start(self):
108        '''
109        This method executes the file transfer from the sequencer, adds the dataset
110        to the data library & finally updates the data transfer status in the db
111        '''
112        # datatx
113        self.transfer_files()
114        # add the dataset to the given library
115        self.add_to_library()
116        # update the data transfer status in the db
117        self.update_status(Sample.transfer_status.COMPLETE)
118        # cleanup
119        #self.cleanup()   
120        sys.exit(0)
121       
122    def cleanup(self):
123        '''
124        remove the directory created to store the dataset files temporarily
125        before adding the same to the data library
126        '''
127        try:
128            time.sleep(60)
129            shutil.rmtree( self.server_dir )
130        except:
131            self.error_and_exit()
132
133           
134    def error_and_exit(self, msg=''):
135        '''
136        This method is called any exception is raised. This prints the traceback
137        and terminates this script
138        '''
139        log.error(traceback.format_exc())
140        log.error('FATAL ERROR.'+msg)
141        self.update_status('Error', 'All', msg+"\n"+traceback.format_exc())
142        sys.exit(1)
143       
144    def transfer_files(self):
145        '''
146        This method executes a scp process using pexpect library to transfer
147        the dataset file from the remote sequencer to the Galaxy server
148        '''
149        def print_ticks(d):
150            pass
151        for i, df in enumerate(self.dataset_files):
152            self.update_status(Sample.transfer_status.TRANSFERRING, df['dataset_id'])
153            try:
154                cmd = "scp %s@%s:'%s' '%s/%s'" % ( self.username,
155                                            self.host,
156                                            df['file'].replace(' ', '\ '),
157                                            self.server_dir.replace(' ', '\ '),
158                                            df['name'].replace(' ', '\ '))
159                log.debug(cmd)
160                output = pexpect.run(cmd, events={'.ssword:*': self.password+'\r\n',
161                                                  pexpect.TIMEOUT:print_ticks},
162                                                  timeout=10)
163                log.debug(output)
164                path = os.path.join(self.server_dir, os.path.basename(df['name']))
165                if not os.path.exists(path):
166                    msg = 'Could not find the local file after transfer (%s)' % path
167                    log.error(msg)
168                    raise Exception(msg)
169            except Exception, e:
170                msg = traceback.format_exc()
171                self.update_status('Error', df['dataset_id'], msg)
172
173       
174    def add_to_library(self):
175        '''
176        This method adds the dataset file to the target data library & folder
177        by opening the corresponding url in Galaxy server running. 
178        '''
179        try:
180            self.update_status(Sample.transfer_status.ADD_TO_LIBRARY)
181            log.debug("dir:%s, lib:%s, folder:%s" % (self.server_dir, str(self.library_id), str(self.folder_id)))
182            galaxyweb = GalaxyWebInterface(self.server_host, self.server_port,
183                                           self.datatx_email, self.datatx_password,
184                                           self.config_id_secret)
185            retval = galaxyweb.add_to_library(self.server_dir, self.library_id, self.folder_id)
186            log.debug(str(retval))
187            galaxyweb.logout()
188        except Exception, e:
189            log.debug(e)
190            self.error_and_exit(str(e))
191           
192    def update_status(self, status, dataset_id='All', msg=''):
193        '''
194        Update the data transfer status for this dataset in the database
195        '''
196        try:
197            log.debug('Setting status "%s" for dataset "%s" of sample "%s"' % ( status, str(dataset_id), str(self.sample_id) ) )
198            if dataset_id == 'All':
199                for dataset in self.dataset_files:
200                    self.galaxydb.set_sample_dataset_status(dataset['dataset_id'], status, msg)
201            else:
202                self.galaxydb.set_sample_dataset_status(dataset_id, status, msg)
203            log.debug('done.')
204        except:
205            log.error(traceback.format_exc())
206            log.error('FATAL ERROR')
207            sys.exit(1)
208           
209    def get_value(self, dom, tag_name):
210        '''
211        This method extracts the tag value from the xml message
212        '''
213        nodelist = dom.getElementsByTagName(tag_name)[0].childNodes
214        rc = ""
215        for node in nodelist:
216            if node.nodeType == node.TEXT_NODE:
217                rc = rc + node.data
218        return rc
219   
220    def get_value_index(self, dom, tag_name, dataset_id):
221        '''
222        This method extracts the tag value from the xml message
223        '''
224        try:
225            nodelist = dom.getElementsByTagName(tag_name)[dataset_id].childNodes
226        except:
227            return None
228        rc = ""
229        for node in nodelist:
230            if node.nodeType == node.TEXT_NODE:
231                rc = rc + node.data
232        return rc
233
234if __name__ == '__main__':
235    log.info('STARTING %i %s' % (os.getpid(), str(sys.argv)))
236    #
237    # Start the daemon
238    #
239    dt = DataTransfer(sys.argv[1], sys.argv[2])
240    dt.start()
241    sys.exit(0)
242
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。