root/galaxy-central/lib/galaxy/jobs/runners/local.py

リビジョン 2, 9.0 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import logging
2import subprocess
3from Queue import Queue
4import threading
5
6from galaxy import model
7from galaxy.datatypes.data import nice_size
8
9import os, errno
10from time import sleep
11
12log = logging.getLogger( __name__ )
13
14class LocalJobRunner( object ):
15    """
16    Job runner backed by a finite pool of worker threads. FIFO scheduling
17    """
18    STOP_SIGNAL = object()
19    def __init__( self, app ):
20        """Start the job runner with 'nworkers' worker threads"""
21        self.app = app
22        self.sa_session = app.model.context
23        # put lib into the PYTHONPATH for subprocesses
24        if 'PYTHONPATH' in os.environ:
25            os.environ['PYTHONPATH'] = '%s:%s' % ( os.environ['PYTHONPATH'], os.path.abspath( 'lib' ) )
26        else:
27            os.environ['PYTHONPATH'] = os.path.abspath( 'lib' )
28        # start workers
29        self.queue = Queue()
30        self.threads = []
31        nworkers = app.config.local_job_queue_workers
32        log.info( "starting workers" )
33        for i in range( nworkers  ):
34            worker = threading.Thread( target=self.run_next )
35            worker.start()
36            self.threads.append( worker )
37        log.debug( "%d workers ready", nworkers )
38
39    def run_next( self ):
40        """Run the next job, waiting until one is available if neccesary"""
41        while 1:
42            job_wrapper = self.queue.get()
43            if job_wrapper is self.STOP_SIGNAL:
44                return
45            try:
46                self.run_job( job_wrapper )
47            except:
48                log.exception( "Uncaught exception running job" )
49               
50    def run_job( self, job_wrapper ):
51        job_wrapper.set_runner( 'local:///', None )
52        stderr = stdout = command_line = ''
53        # Prepare the job to run
54        try:
55            job_wrapper.prepare()
56            command_line = job_wrapper.get_command_line()
57            if job_wrapper.dependency_shell_commands:
58                command_line = "; ".join( job_wrapper.dependency_shell_commands + [ command_line ] )
59        except:
60            job_wrapper.fail( "failure preparing job", exception=True )
61            log.exception("failure running job %d" % job_wrapper.job_id)
62            return
63        # If we were able to get a command line, run the job
64        if command_line:
65            try:
66                log.debug( 'executing: %s' % command_line )
67                proc = subprocess.Popen( args = command_line,
68                                         shell = True,
69                                         cwd = job_wrapper.working_directory,
70                                         stdout = subprocess.PIPE,
71                                         stderr = subprocess.PIPE,
72                                         env = os.environ,
73                                         preexec_fn = os.setpgrp )
74                job_wrapper.set_runner( 'local:///', proc.pid )
75                job_wrapper.change_state( model.Job.states.RUNNING )
76                if self.app.config.output_size_limit > 0:
77                    sleep_time = 1
78                    while proc.poll() is None:
79                        for outfile, size in job_wrapper.check_output_sizes():
80                            if size > self.app.config.output_size_limit:
81                                # Error the job immediately
82                                job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \
83                                    % nice_size( self.app.config.output_size_limit ) )
84                                log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \
85                                    % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) )
86                                # Then kill it
87                                os.killpg( proc.pid, 15 )
88                                sleep( 1 )
89                                if proc.poll() is None:
90                                    os.killpg( proc.pid, 9 )
91                                proc.wait() # reap
92                                log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) )
93                                return
94                            sleep( sleep_time )
95                            if sleep_time < 8:
96                                # So we don't stat every second
97                                sleep_time *= 2
98                stdout = proc.stdout.read()
99                stderr = proc.stderr.read()
100                proc.wait() # reap
101                log.debug('execution finished: %s' % command_line)
102            except Exception, exc:
103                job_wrapper.fail( "failure running job", exception=True )
104                log.exception("failure running job %d" % job_wrapper.job_id)
105                return
106        #run the metadata setting script here
107        #this is terminate-able when output dataset/job is deleted
108        #so that long running set_meta()s can be canceled without having to reboot the server
109        if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and self.app.config.set_metadata_externally and job_wrapper.output_paths:
110            external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(),
111                                                                            set_extension = True,
112                                                                            kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior
113            log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) )
114            external_metadata_proc = subprocess.Popen( args = external_metadata_script,
115                                         shell = True,
116                                         env = os.environ,
117                                         preexec_fn = os.setpgrp )
118            job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session )
119            external_metadata_proc.wait()
120            log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id )
121       
122        # Finish the job               
123        try:
124            job_wrapper.finish( stdout, stderr )
125        except:
126            log.exception("Job wrapper finish method failed")
127            job_wrapper.fail("Unable to finish job", exception=True)
128
129    def put( self, job_wrapper ):
130        """Add a job to the queue (by job identifier)"""
131        # Change to queued state before handing to worker thread so the runner won't pick it up again
132        job_wrapper.change_state( model.Job.states.QUEUED )
133        self.queue.put( job_wrapper )
134   
135    def shutdown( self ):
136        """Attempts to gracefully shut down the worker threads"""
137        log.info( "sending stop signal to worker threads" )
138        for i in range( len( self.threads ) ):
139            self.queue.put( self.STOP_SIGNAL )
140        log.info( "local job runner stopped" )
141
142    def check_pid( self, pid ):
143        try:
144            os.kill( pid, 0 )
145            return True
146        except OSError, e:
147            if e.errno == errno.ESRCH:
148                log.debug( "check_pid(): PID %d is dead" % pid )
149            else:
150                log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" %( errno.errorcode[e.errno], pid, e.strerror ) )
151            return False
152
153    def stop_job( self, job ):
154        #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
155        if job.external_output_metadata:
156            pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
157        else:
158            pid = job.job_runner_external_id
159        if pid in [ None, '' ]:
160            log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id )
161            return
162        pid = int( pid )
163        if not self.check_pid( pid ):
164            log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) )
165            return
166        for sig in [ 15, 9 ]:
167            try:
168                os.killpg( pid, sig )
169            except OSError, e:
170                log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) )
171                return  # give up
172            sleep( 2 )
173            if not self.check_pid( pid ):
174                log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.id, pid, sig ) )
175                return
176        else:
177            log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.id, pid ) )
178
179    def recover( self, job, job_wrapper ):
180        # local jobs can't be recovered
181        job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted.  Please retry the job." )
182
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。