root/galaxy-central/lib/galaxy/jobs/runners/drmaa.py

リビジョン 2, 14.6 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import os, logging, threading, time
2from Queue import Queue, Empty
3
4from galaxy import model
5from paste.deploy.converters import asbool
6
7import pkg_resources
8
9try:
10    pkg_resources.require( "drmaa" )
11    drmaa = __import__( "drmaa" )
12except Exception, e:
13    drmaa = str( e )
14
15log = logging.getLogger( __name__ )
16
17if type( drmaa ) != str:
18    drmaa_state = {
19        drmaa.JobState.UNDETERMINED: 'process status cannot be determined',
20        drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active',
21        drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold',
22        drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold',
23        drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
24        drmaa.JobState.RUNNING: 'job is running',
25        drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended',
26        drmaa.JobState.USER_SUSPENDED: 'job is user suspended',
27        drmaa.JobState.DONE: 'job finished normally',
28        drmaa.JobState.FAILED: 'job finished, but failed',
29    }
30
31drm_template = """#!/bin/sh
32#$ -S /bin/sh
33GALAXY_LIB="%s"
34if [ "$GALAXY_LIB" != "None" ]; then
35    if [ -n "$PYTHONPATH" ]; then
36        PYTHONPATH="$GALAXY_LIB:$PYTHONPATH"
37    else
38        PYTHONPATH="$GALAXY_LIB"
39    fi
40    export PYTHONPATH
41fi
42cd %s
43%s
44"""
45
46class DRMAAJobState( object ):
47    def __init__( self ):
48        """
49        Encapsulates state related to a job that is being run via the DRM and
50        that we need to monitor.
51        """
52        self.job_wrapper = None
53        self.job_id = None
54        self.old_state = None
55        self.running = False
56        self.job_file = None
57        self.ofile = None
58        self.efile = None
59        self.runner_url = None
60
61class DRMAAJobRunner( object ):
62    """
63    Job runner backed by a finite pool of worker threads. FIFO scheduling
64    """
65    STOP_SIGNAL = object()
66    def __init__( self, app ):
67        """Initialize this job runner and start the monitor thread"""
68        # Check if drmaa was importable, fail if not
69        if type( drmaa ) == str:
70            raise Exception( "DRMAAJobRunner requires drmaa module which could not be loaded: %s" % drmaa )
71        self.app = app
72        self.sa_session = app.model.context
73        # 'watched' and 'queue' are both used to keep track of jobs to watch.
74        # 'queue' is used to add new watched jobs, and can be called from
75        # any thread (usually by the 'queue_job' method). 'watched' must only
76        # be modified by the monitor thread, which will move items from 'queue'
77        # to 'watched' and then manage the watched jobs.
78        self.watched = []
79        self.monitor_queue = Queue()
80        self.ds = drmaa.Session()
81        self.ds.initialize()
82        self.monitor_thread = threading.Thread( target=self.monitor )
83        self.monitor_thread.start()
84        self.work_queue = Queue()
85        self.work_threads = []
86        nworkers = app.config.cluster_job_queue_workers
87        for i in range( nworkers ):
88            worker = threading.Thread( target=self.run_next )
89            worker.start()
90            self.work_threads.append( worker )
91        log.debug( "%d workers ready" % nworkers )
92
93    def get_native_spec( self, url ):
94        """Get any native DRM arguments specified by the site configuration"""
95        try:
96            return url.split('/')[2] or None
97        except:
98            return None
99
100    def run_next( self ):
101        """
102        Run the next item in the queue (a job waiting to run or finish )
103        """
104        while 1:
105            ( op, obj ) = self.work_queue.get()
106            if op is self.STOP_SIGNAL:
107                return
108            try:
109                if op == 'queue':
110                    self.queue_job( obj )
111                elif op == 'finish':
112                    self.finish_job( obj )
113                elif op == 'fail':
114                    self.fail_job( obj )
115            except:
116                log.exception( "Uncaught exception %sing job" % op )
117
118    def queue_job( self, job_wrapper ):
119        """Create job script and submit it to the DRM"""
120
121        try:
122            job_wrapper.prepare()
123            command_line = job_wrapper.get_command_line()
124        except:
125            job_wrapper.fail( "failure preparing job", exception=True )
126            log.exception("failure running job %d" % job_wrapper.job_id)
127            return
128
129        runner_url = job_wrapper.tool.job_runner
130       
131        # This is silly, why would we queue a job with no command line?
132        if not command_line:
133            job_wrapper.finish( '', '' )
134            return
135       
136        # Check for deletion before we change state
137        if job_wrapper.get_state() == model.Job.states.DELETED:
138            log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.job_id )
139            job_wrapper.cleanup()
140            return
141
142        # Change to queued state immediately
143        job_wrapper.change_state( model.Job.states.QUEUED )
144
145        # define job attributes
146        ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id)
147        efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id)
148        jt = self.ds.createJobTemplate()
149        jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id)
150        jt.outputPath = ":%s" % ofile
151        jt.errorPath = ":%s" % efile
152        native_spec = self.get_native_spec( runner_url )
153        if native_spec is not None:
154            jt.nativeSpecification = native_spec
155
156        script = drm_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line)
157        if self.app.config.set_metadata_externally:
158            script += "cd %s\n" % os.path.abspath( os.getcwd() )
159            script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ),
160                                                                    tmp_dir = self.app.config.new_file_path,
161                                                                    dataset_files_path = self.app.model.Dataset.file_path,
162                                                                    output_fnames = job_wrapper.get_output_fnames(),
163                                                                    set_extension = False,
164                                                                    kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior
165        fh = file( jt.remoteCommand, "w" )
166        fh.write( script )
167        fh.close()
168        os.chmod( jt.remoteCommand, 0750 )
169
170        # job was deleted while we were preparing it
171        if job_wrapper.get_state() == model.Job.states.DELETED:
172            log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.job_id )
173            self.cleanup( ( ofile, efile, jt.remoteCommand ) )
174            job_wrapper.cleanup()
175            return
176
177        galaxy_job_id = job_wrapper.job_id
178        log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) )
179        log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) )
180        # runJob will raise if there's a submit problem
181        job_id = self.ds.runJob(jt)
182        log.info("(%s) queued as %s" % ( galaxy_job_id, job_id ) )
183
184        # store runner information for tracking if Galaxy restarts
185        job_wrapper.set_runner( runner_url, job_id )
186
187        # Store DRM related state information for job
188        drm_job_state = DRMAAJobState()
189        drm_job_state.job_wrapper = job_wrapper
190        drm_job_state.job_id = job_id
191        drm_job_state.ofile = ofile
192        drm_job_state.efile = efile
193        drm_job_state.job_file = jt.remoteCommand
194        drm_job_state.old_state = 'new'
195        drm_job_state.running = False
196        drm_job_state.runner_url = runner_url
197       
198        # delete the job template
199        self.ds.deleteJobTemplate( jt )
200
201        # Add to our 'queue' of jobs to monitor
202        self.monitor_queue.put( drm_job_state )
203
204    def monitor( self ):
205        """
206        Watches jobs currently in the PBS queue and deals with state changes
207        (queued to running) and job completion
208        """
209        while 1:
210            # Take any new watched jobs and put them on the monitor list
211            try:
212                while 1:
213                    drm_job_state = self.monitor_queue.get_nowait()
214                    if drm_job_state is self.STOP_SIGNAL:
215                        # TODO: This is where any cleanup would occur
216                        self.ds.exit()
217                        return
218                    self.watched.append( drm_job_state )
219            except Empty:
220                pass
221            # Iterate over the list of watched jobs and check state
222            self.check_watched_items()
223            # Sleep a bit before the next state check
224            time.sleep( 1 )
225           
226    def check_watched_items( self ):
227        """
228        Called by the monitor thread to look at each watched job and deal
229        with state changes.
230        """
231        new_watched = []
232        for drm_job_state in self.watched:
233            job_id = drm_job_state.job_id
234            galaxy_job_id = drm_job_state.job_wrapper.job_id
235            old_state = drm_job_state.old_state
236            try:
237                state = self.ds.jobStatus( job_id )
238            except drmaa.InvalidJobException:
239                # we should only get here if an orphaned job was put into the queue at app startup
240                log.debug("(%s/%s) job left DRM queue" % ( galaxy_job_id, job_id ) )
241                self.work_queue.put( ( 'finish', drm_job_state ) )
242                continue
243            except Exception, e:
244                # so we don't kill the monitor thread
245                log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) )
246                log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) )
247                drm_job_state.fail_message = "Cluster could not complete job"
248                self.work_queue.put( ( 'fail', drm_job_state ) )
249                continue
250            if state != old_state:
251                log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, drmaa_state[state] ) )
252            if state == drmaa.JobState.RUNNING and not drm_job_state.running:
253                drm_job_state.running = True
254                drm_job_state.job_wrapper.change_state( model.Job.states.RUNNING )
255            if state in ( drmaa.JobState.DONE, drmaa.JobState.FAILED ):
256                self.work_queue.put( ( 'finish', drm_job_state ) )
257                continue
258            drm_job_state.old_state = state
259            new_watched.append( drm_job_state )
260        # Replace the watch list with the updated version
261        self.watched = new_watched
262       
263    def finish_job( self, drm_job_state ):
264        """
265        Get the output/error for a finished job, pass to `job_wrapper.finish`
266        and cleanup all the DRM temporary files.
267        """
268        ofile = drm_job_state.ofile
269        efile = drm_job_state.efile
270        job_file = drm_job_state.job_file
271        # collect the output
272        try:
273            ofh = file(ofile, "r")
274            efh = file(efile, "r")
275            stdout = ofh.read()
276            stderr = efh.read()
277        except:
278            stdout = ''
279            stderr = 'Job output not returned from cluster'
280            log.debug(stderr)
281
282        try:
283            drm_job_state.job_wrapper.finish( stdout, stderr )
284        except:
285            log.exception("Job wrapper finish method failed")
286
287        # clean up the drm files
288        self.cleanup( ( ofile, efile, job_file ) )
289
290    def fail_job( self, drm_job_state ):
291        """
292        Seperated out so we can use the worker threads for it.
293        """
294        self.stop_job( self.sa_session.query( self.app.model.Job ).get( drm_job_state.job_wrapper.job_id ) )
295        drm_job_state.job_wrapper.fail( drm_job_state.fail_message )
296        self.cleanup( ( drm_job_state.ofile, drm_job_state.efile, drm_job_state.job_file ) )
297
298    def cleanup( self, files ):
299        if not asbool( self.app.config.get( 'debug', False ) ):
300            for file in files:
301                if os.access( file, os.R_OK ):
302                    os.unlink( file )
303
304    def put( self, job_wrapper ):
305        """Add a job to the queue (by job identifier)"""
306        # Change to queued state before handing to worker thread so the runner won't pick it up again
307        job_wrapper.change_state( model.Job.states.QUEUED )
308        self.work_queue.put( ( 'queue', job_wrapper ) )
309
310    def shutdown( self ):
311        """Attempts to gracefully shut down the monitor thread"""
312        log.info( "sending stop signal to worker threads" )
313        self.monitor_queue.put( self.STOP_SIGNAL )
314        for i in range( len( self.work_threads ) ):
315            self.work_queue.put( ( self.STOP_SIGNAL, None ) )
316        log.info( "drmaa job runner stopped" )
317
318    def stop_job( self, job ):
319        """Attempts to delete a job from the DRM queue"""
320        try:
321            self.ds.control( job.job_runner_external_id, drmaa.JobControlAction.TERMINATE )
322            log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.id, job.job_runner_external_id ) )
323        except drmaa.InvalidJobException:
324            log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.id, job.job_runner_external_id ) )
325        except Exception, e:
326            log.debug( "(%s/%s) User killed running job, but error encountered removing from DRM queue: %s" % ( job.id, job.job_runner_external_id, e ) )
327
328    def recover( self, job, job_wrapper ):
329        """Recovers jobs stuck in the queued/running state when Galaxy started"""
330        drm_job_state = DRMAAJobState()
331        drm_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id)
332        drm_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id)
333        drm_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id)
334        drm_job_state.job_id = str( job.job_runner_external_id )
335        drm_job_state.runner_url = job_wrapper.tool.job_runner
336        job_wrapper.command_line = job.command_line
337        drm_job_state.job_wrapper = job_wrapper
338        if job.state == model.Job.states.RUNNING:
339            log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) )
340            drm_job_state.old_state = drmaa.JobState.RUNNING
341            drm_job_state.running = True
342            self.monitor_queue.put( drm_job_state )
343        elif job.state == model.Job.states.QUEUED:
344            log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) )
345            drm_job_state.old_state = drmaa.JobState.QUEUED_ACTIVE
346            drm_job_state.running = False
347            self.monitor_queue.put( drm_job_state )
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。