1 | import logging |
---|
2 | import subprocess |
---|
3 | from Queue import Queue |
---|
4 | import threading |
---|
5 | |
---|
6 | from galaxy import model |
---|
7 | from galaxy.datatypes.data import nice_size |
---|
8 | |
---|
9 | import os, errno |
---|
10 | from time import sleep |
---|
11 | |
---|
12 | log = logging.getLogger( __name__ ) |
---|
13 | |
---|
14 | class LocalJobRunner( object ): |
---|
15 | """ |
---|
16 | Job runner backed by a finite pool of worker threads. FIFO scheduling |
---|
17 | """ |
---|
18 | STOP_SIGNAL = object() |
---|
19 | def __init__( self, app ): |
---|
20 | """Start the job runner with 'nworkers' worker threads""" |
---|
21 | self.app = app |
---|
22 | self.sa_session = app.model.context |
---|
23 | # put lib into the PYTHONPATH for subprocesses |
---|
24 | if 'PYTHONPATH' in os.environ: |
---|
25 | os.environ['PYTHONPATH'] = '%s:%s' % ( os.environ['PYTHONPATH'], os.path.abspath( 'lib' ) ) |
---|
26 | else: |
---|
27 | os.environ['PYTHONPATH'] = os.path.abspath( 'lib' ) |
---|
28 | # start workers |
---|
29 | self.queue = Queue() |
---|
30 | self.threads = [] |
---|
31 | nworkers = app.config.local_job_queue_workers |
---|
32 | log.info( "starting workers" ) |
---|
33 | for i in range( nworkers ): |
---|
34 | worker = threading.Thread( target=self.run_next ) |
---|
35 | worker.start() |
---|
36 | self.threads.append( worker ) |
---|
37 | log.debug( "%d workers ready", nworkers ) |
---|
38 | |
---|
39 | def run_next( self ): |
---|
40 | """Run the next job, waiting until one is available if neccesary""" |
---|
41 | while 1: |
---|
42 | job_wrapper = self.queue.get() |
---|
43 | if job_wrapper is self.STOP_SIGNAL: |
---|
44 | return |
---|
45 | try: |
---|
46 | self.run_job( job_wrapper ) |
---|
47 | except: |
---|
48 | log.exception( "Uncaught exception running job" ) |
---|
49 | |
---|
50 | def run_job( self, job_wrapper ): |
---|
51 | job_wrapper.set_runner( 'local:///', None ) |
---|
52 | stderr = stdout = command_line = '' |
---|
53 | # Prepare the job to run |
---|
54 | try: |
---|
55 | job_wrapper.prepare() |
---|
56 | command_line = job_wrapper.get_command_line() |
---|
57 | if job_wrapper.dependency_shell_commands: |
---|
58 | command_line = "; ".join( job_wrapper.dependency_shell_commands + [ command_line ] ) |
---|
59 | except: |
---|
60 | job_wrapper.fail( "failure preparing job", exception=True ) |
---|
61 | log.exception("failure running job %d" % job_wrapper.job_id) |
---|
62 | return |
---|
63 | # If we were able to get a command line, run the job |
---|
64 | if command_line: |
---|
65 | try: |
---|
66 | log.debug( 'executing: %s' % command_line ) |
---|
67 | proc = subprocess.Popen( args = command_line, |
---|
68 | shell = True, |
---|
69 | cwd = job_wrapper.working_directory, |
---|
70 | stdout = subprocess.PIPE, |
---|
71 | stderr = subprocess.PIPE, |
---|
72 | env = os.environ, |
---|
73 | preexec_fn = os.setpgrp ) |
---|
74 | job_wrapper.set_runner( 'local:///', proc.pid ) |
---|
75 | job_wrapper.change_state( model.Job.states.RUNNING ) |
---|
76 | if self.app.config.output_size_limit > 0: |
---|
77 | sleep_time = 1 |
---|
78 | while proc.poll() is None: |
---|
79 | for outfile, size in job_wrapper.check_output_sizes(): |
---|
80 | if size > self.app.config.output_size_limit: |
---|
81 | # Error the job immediately |
---|
82 | job_wrapper.fail( 'Job output grew too large (greater than %s), please try different job parameters or' \ |
---|
83 | % nice_size( self.app.config.output_size_limit ) ) |
---|
84 | log.warning( 'Terminating job %s due to output %s growing larger than %s limit' \ |
---|
85 | % ( job_wrapper.job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) |
---|
86 | # Then kill it |
---|
87 | os.killpg( proc.pid, 15 ) |
---|
88 | sleep( 1 ) |
---|
89 | if proc.poll() is None: |
---|
90 | os.killpg( proc.pid, 9 ) |
---|
91 | proc.wait() # reap |
---|
92 | log.debug( 'Job %s (pid %s) terminated' % ( job_wrapper.job_id, proc.pid ) ) |
---|
93 | return |
---|
94 | sleep( sleep_time ) |
---|
95 | if sleep_time < 8: |
---|
96 | # So we don't stat every second |
---|
97 | sleep_time *= 2 |
---|
98 | stdout = proc.stdout.read() |
---|
99 | stderr = proc.stderr.read() |
---|
100 | proc.wait() # reap |
---|
101 | log.debug('execution finished: %s' % command_line) |
---|
102 | except Exception, exc: |
---|
103 | job_wrapper.fail( "failure running job", exception=True ) |
---|
104 | log.exception("failure running job %d" % job_wrapper.job_id) |
---|
105 | return |
---|
106 | #run the metadata setting script here |
---|
107 | #this is terminate-able when output dataset/job is deleted |
---|
108 | #so that long running set_meta()s can be canceled without having to reboot the server |
---|
109 | if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and self.app.config.set_metadata_externally and job_wrapper.output_paths: |
---|
110 | external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), |
---|
111 | set_extension = True, |
---|
112 | kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior |
---|
113 | log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) |
---|
114 | external_metadata_proc = subprocess.Popen( args = external_metadata_script, |
---|
115 | shell = True, |
---|
116 | env = os.environ, |
---|
117 | preexec_fn = os.setpgrp ) |
---|
118 | job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) |
---|
119 | external_metadata_proc.wait() |
---|
120 | log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) |
---|
121 | |
---|
122 | # Finish the job |
---|
123 | try: |
---|
124 | job_wrapper.finish( stdout, stderr ) |
---|
125 | except: |
---|
126 | log.exception("Job wrapper finish method failed") |
---|
127 | job_wrapper.fail("Unable to finish job", exception=True) |
---|
128 | |
---|
129 | def put( self, job_wrapper ): |
---|
130 | """Add a job to the queue (by job identifier)""" |
---|
131 | # Change to queued state before handing to worker thread so the runner won't pick it up again |
---|
132 | job_wrapper.change_state( model.Job.states.QUEUED ) |
---|
133 | self.queue.put( job_wrapper ) |
---|
134 | |
---|
135 | def shutdown( self ): |
---|
136 | """Attempts to gracefully shut down the worker threads""" |
---|
137 | log.info( "sending stop signal to worker threads" ) |
---|
138 | for i in range( len( self.threads ) ): |
---|
139 | self.queue.put( self.STOP_SIGNAL ) |
---|
140 | log.info( "local job runner stopped" ) |
---|
141 | |
---|
142 | def check_pid( self, pid ): |
---|
143 | try: |
---|
144 | os.kill( pid, 0 ) |
---|
145 | return True |
---|
146 | except OSError, e: |
---|
147 | if e.errno == errno.ESRCH: |
---|
148 | log.debug( "check_pid(): PID %d is dead" % pid ) |
---|
149 | else: |
---|
150 | log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" %( errno.errorcode[e.errno], pid, e.strerror ) ) |
---|
151 | return False |
---|
152 | |
---|
153 | def stop_job( self, job ): |
---|
154 | #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished |
---|
155 | if job.external_output_metadata: |
---|
156 | pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them |
---|
157 | else: |
---|
158 | pid = job.job_runner_external_id |
---|
159 | if pid in [ None, '' ]: |
---|
160 | log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) |
---|
161 | return |
---|
162 | pid = int( pid ) |
---|
163 | if not self.check_pid( pid ): |
---|
164 | log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) ) |
---|
165 | return |
---|
166 | for sig in [ 15, 9 ]: |
---|
167 | try: |
---|
168 | os.killpg( pid, sig ) |
---|
169 | except OSError, e: |
---|
170 | log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) |
---|
171 | return # give up |
---|
172 | sleep( 2 ) |
---|
173 | if not self.check_pid( pid ): |
---|
174 | log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job.id, pid, sig ) ) |
---|
175 | return |
---|
176 | else: |
---|
177 | log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" %( job.id, pid ) ) |
---|
178 | |
---|
179 | def recover( self, job, job_wrapper ): |
---|
180 | # local jobs can't be recovered |
---|
181 | job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." ) |
---|
182 | |
---|