1 | import os, logging, threading, time |
---|
2 | from Queue import Queue, Empty |
---|
3 | |
---|
4 | from galaxy import model |
---|
5 | from paste.deploy.converters import asbool |
---|
6 | |
---|
7 | import pkg_resources |
---|
8 | |
---|
9 | try: |
---|
10 | pkg_resources.require( "DRMAA_python" ) |
---|
11 | DRMAA = __import__( "DRMAA" ) |
---|
12 | except: |
---|
13 | DRMAA = None |
---|
14 | |
---|
15 | log = logging.getLogger( __name__ ) |
---|
16 | |
---|
17 | if DRMAA is not None: |
---|
18 | DRMAA_state = { |
---|
19 | DRMAA.Session.UNDETERMINED: 'process status cannot be determined', |
---|
20 | DRMAA.Session.QUEUED_ACTIVE: 'job is queued and waiting to be scheduled', |
---|
21 | DRMAA.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold', |
---|
22 | DRMAA.Session.USER_ON_HOLD: 'job is queued and in user hold', |
---|
23 | DRMAA.Session.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', |
---|
24 | DRMAA.Session.RUNNING: 'job is running', |
---|
25 | DRMAA.Session.SYSTEM_SUSPENDED: 'job is system suspended', |
---|
26 | DRMAA.Session.USER_SUSPENDED: 'job is user suspended', |
---|
27 | DRMAA.Session.DONE: 'job finished normally', |
---|
28 | DRMAA.Session.FAILED: 'job finished, but failed', |
---|
29 | } |
---|
30 | |
---|
31 | sge_template = """#!/bin/sh |
---|
32 | #$ -S /bin/sh |
---|
33 | GALAXY_LIB="%s" |
---|
34 | if [ "$GALAXY_LIB" != "None" ]; then |
---|
35 | if [ -n "$PYTHONPATH" ]; then |
---|
36 | PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" |
---|
37 | else |
---|
38 | PYTHONPATH="$GALAXY_LIB" |
---|
39 | fi |
---|
40 | export PYTHONPATH |
---|
41 | fi |
---|
42 | cd %s |
---|
43 | %s |
---|
44 | """ |
---|
45 | |
---|
46 | class SGEJobState( object ): |
---|
47 | def __init__( self ): |
---|
48 | """ |
---|
49 | Encapsulates state related to a job that is being run via SGE and |
---|
50 | that we need to monitor. |
---|
51 | """ |
---|
52 | self.job_wrapper = None |
---|
53 | self.job_id = None |
---|
54 | self.old_state = None |
---|
55 | self.running = False |
---|
56 | self.job_file = None |
---|
57 | self.ofile = None |
---|
58 | self.efile = None |
---|
59 | self.runner_url = None |
---|
60 | |
---|
61 | class SGEJobRunner( object ): |
---|
62 | """ |
---|
63 | Job runner backed by a finite pool of worker threads. FIFO scheduling |
---|
64 | """ |
---|
65 | STOP_SIGNAL = object() |
---|
66 | def __init__( self, app ): |
---|
67 | """Initialize this job runner and start the monitor thread""" |
---|
68 | # Check if SGE was importable, fail if not |
---|
69 | if DRMAA is None: |
---|
70 | raise Exception( "SGEJobRunner requires DRMAA_python which was not found" ) |
---|
71 | self.app = app |
---|
72 | self.sa_session = app.model.context |
---|
73 | # 'watched' and 'queue' are both used to keep track of jobs to watch. |
---|
74 | # 'queue' is used to add new watched jobs, and can be called from |
---|
75 | # any thread (usually by the 'queue_job' method). 'watched' must only |
---|
76 | # be modified by the monitor thread, which will move items from 'queue' |
---|
77 | # to 'watched' and then manage the watched jobs. |
---|
78 | self.watched = [] |
---|
79 | self.monitor_queue = Queue() |
---|
80 | self.default_cell = self.determine_sge_cell( self.app.config.default_cluster_job_runner ) |
---|
81 | self.ds = DRMAA.Session() |
---|
82 | self.ds.init( self.default_cell ) |
---|
83 | self.monitor_thread = threading.Thread( target=self.monitor ) |
---|
84 | self.monitor_thread.start() |
---|
85 | self.work_queue = Queue() |
---|
86 | self.work_threads = [] |
---|
87 | nworkers = app.config.cluster_job_queue_workers |
---|
88 | for i in range( nworkers ): |
---|
89 | worker = threading.Thread( target=self.run_next ) |
---|
90 | worker.start() |
---|
91 | self.work_threads.append( worker ) |
---|
92 | log.debug( "%d workers ready" % nworkers ) |
---|
93 | |
---|
94 | def determine_sge_cell( self, url ): |
---|
95 | """Determine what SGE cell we are using""" |
---|
96 | url_split = url.split("/") |
---|
97 | if url_split[0] == 'sge:': |
---|
98 | return url_split[2] |
---|
99 | # this could happen if sge is started, but is not the default runner |
---|
100 | else: |
---|
101 | return '' |
---|
102 | |
---|
103 | def determine_sge_queue( self, url ): |
---|
104 | """Determine what SGE queue we are submitting to""" |
---|
105 | try: |
---|
106 | return url.split('/')[3] or None |
---|
107 | except: |
---|
108 | return None |
---|
109 | |
---|
110 | def determine_sge_project( self, url ): |
---|
111 | """Determine what SGE project we are submitting to""" |
---|
112 | try: |
---|
113 | return url.split('/')[4] or None |
---|
114 | except: |
---|
115 | return None |
---|
116 | |
---|
117 | def determine_sge_tool_parameters( self, url ): |
---|
118 | """Determine what are the tool's specific paramters""" |
---|
119 | try: |
---|
120 | return url.split('/')[5] or None |
---|
121 | except: |
---|
122 | return None |
---|
123 | |
---|
124 | def run_next( self ): |
---|
125 | """ |
---|
126 | Run the next item in the queue (a job waiting to run or finish ) |
---|
127 | """ |
---|
128 | while 1: |
---|
129 | ( op, obj ) = self.work_queue.get() |
---|
130 | if op is self.STOP_SIGNAL: |
---|
131 | return |
---|
132 | try: |
---|
133 | if op == 'queue': |
---|
134 | self.queue_job( obj ) |
---|
135 | elif op == 'finish': |
---|
136 | self.finish_job( obj ) |
---|
137 | elif op == 'fail': |
---|
138 | self.fail_job( obj ) |
---|
139 | except: |
---|
140 | log.exception( "Uncaught exception %sing job" % op ) |
---|
141 | |
---|
142 | def queue_job( self, job_wrapper ): |
---|
143 | """Create SGE script for a job and submit it to the SGE queue""" |
---|
144 | |
---|
145 | try: |
---|
146 | job_wrapper.prepare() |
---|
147 | command_line = job_wrapper.get_command_line() |
---|
148 | except: |
---|
149 | job_wrapper.fail( "failure preparing job", exception=True ) |
---|
150 | log.exception("failure running job %d" % job_wrapper.job_id) |
---|
151 | return |
---|
152 | |
---|
153 | runner_url = job_wrapper.tool.job_runner |
---|
154 | |
---|
155 | # This is silly, why would we queue a job with no command line? |
---|
156 | if not command_line: |
---|
157 | job_wrapper.finish( '', '' ) |
---|
158 | return |
---|
159 | |
---|
160 | # Check for deletion before we change state |
---|
161 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
162 | log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id ) |
---|
163 | job_wrapper.cleanup() |
---|
164 | return |
---|
165 | |
---|
166 | # Change to queued state immediately |
---|
167 | job_wrapper.change_state( model.Job.states.QUEUED ) |
---|
168 | |
---|
169 | if self.determine_sge_cell( runner_url ) != self.default_cell: |
---|
170 | # TODO: support multiple cells |
---|
171 | log.warning( "(%s) Using multiple SGE cells is not supported. This job will be submitted to the default cell." % job_wrapper.job_id ) |
---|
172 | sge_queue_name = self.determine_sge_queue( runner_url ) |
---|
173 | sge_project_name = self.determine_sge_project( runner_url ) |
---|
174 | sge_extra_params = self.determine_sge_tool_parameters ( runner_url ) |
---|
175 | |
---|
176 | # define job attributes |
---|
177 | ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id) |
---|
178 | efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id) |
---|
179 | jt = self.ds.createJobTemplate() |
---|
180 | jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id) |
---|
181 | jt.outputPath = ":%s" % ofile |
---|
182 | jt.errorPath = ":%s" % efile |
---|
183 | nativeSpec = [] |
---|
184 | if sge_queue_name is not None: |
---|
185 | nativeSpec.append( "-q '%s'" % sge_queue_name ) |
---|
186 | if sge_project_name is not None: |
---|
187 | nativeSpec.append( "-P '%s'" % sge_project_name) |
---|
188 | if sge_extra_params is not None: |
---|
189 | nativeSpec.append( sge_extra_params ) |
---|
190 | if len(nativeSpec)>0: |
---|
191 | jt.nativeSpecification = ' '.join(nativeSpec) |
---|
192 | |
---|
193 | script = sge_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) |
---|
194 | if self.app.config.set_metadata_externally: |
---|
195 | script += "cd %s\n" % os.path.abspath( os.getcwd() ) |
---|
196 | script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), |
---|
197 | tmp_dir = self.app.config.new_file_path, |
---|
198 | dataset_files_path = self.app.model.Dataset.file_path, |
---|
199 | output_fnames = job_wrapper.get_output_fnames(), |
---|
200 | set_extension = False, |
---|
201 | kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior |
---|
202 | fh = file( jt.remoteCommand, "w" ) |
---|
203 | fh.write( script ) |
---|
204 | fh.close() |
---|
205 | os.chmod( jt.remoteCommand, 0750 ) |
---|
206 | |
---|
207 | # job was deleted while we were preparing it |
---|
208 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
209 | log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id ) |
---|
210 | self.cleanup( ( ofile, efile, jt.remoteCommand ) ) |
---|
211 | job_wrapper.cleanup() |
---|
212 | return |
---|
213 | |
---|
214 | galaxy_job_id = job_wrapper.job_id |
---|
215 | log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) ) |
---|
216 | log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) |
---|
217 | # runJob will raise if there's a submit problem |
---|
218 | job_id = self.ds.runJob(jt) |
---|
219 | if sge_queue_name is None: |
---|
220 | log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) ) |
---|
221 | else: |
---|
222 | log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, sge_queue_name, job_id) ) |
---|
223 | |
---|
224 | # store runner information for tracking if Galaxy restarts |
---|
225 | job_wrapper.set_runner( runner_url, job_id ) |
---|
226 | |
---|
227 | # Store SGE related state information for job |
---|
228 | sge_job_state = SGEJobState() |
---|
229 | sge_job_state.job_wrapper = job_wrapper |
---|
230 | sge_job_state.job_id = job_id |
---|
231 | sge_job_state.ofile = ofile |
---|
232 | sge_job_state.efile = efile |
---|
233 | sge_job_state.job_file = jt.remoteCommand |
---|
234 | sge_job_state.old_state = 'new' |
---|
235 | sge_job_state.running = False |
---|
236 | sge_job_state.runner_url = runner_url |
---|
237 | |
---|
238 | # delete the job template |
---|
239 | self.ds.deleteJobTemplate( jt ) |
---|
240 | |
---|
241 | # Add to our 'queue' of jobs to monitor |
---|
242 | self.monitor_queue.put( sge_job_state ) |
---|
243 | |
---|
244 | def monitor( self ): |
---|
245 | """ |
---|
246 | Watches jobs currently in the PBS queue and deals with state changes |
---|
247 | (queued to running) and job completion |
---|
248 | """ |
---|
249 | while 1: |
---|
250 | # Take any new watched jobs and put them on the monitor list |
---|
251 | try: |
---|
252 | while 1: |
---|
253 | sge_job_state = self.monitor_queue.get_nowait() |
---|
254 | if sge_job_state is self.STOP_SIGNAL: |
---|
255 | # TODO: This is where any cleanup would occur |
---|
256 | self.ds.exit() |
---|
257 | return |
---|
258 | self.watched.append( sge_job_state ) |
---|
259 | except Empty: |
---|
260 | pass |
---|
261 | # Iterate over the list of watched jobs and check state |
---|
262 | self.check_watched_items() |
---|
263 | # Sleep a bit before the next state check |
---|
264 | time.sleep( 1 ) |
---|
265 | |
---|
266 | def check_watched_items( self ): |
---|
267 | """ |
---|
268 | Called by the monitor thread to look at each watched job and deal |
---|
269 | with state changes. |
---|
270 | """ |
---|
271 | new_watched = [] |
---|
272 | for sge_job_state in self.watched: |
---|
273 | job_id = sge_job_state.job_id |
---|
274 | galaxy_job_id = sge_job_state.job_wrapper.job_id |
---|
275 | old_state = sge_job_state.old_state |
---|
276 | try: |
---|
277 | state = self.ds.getJobProgramStatus( job_id ) |
---|
278 | except DRMAA.InvalidJobError: |
---|
279 | # we should only get here if an orphaned job was put into the queue at app startup |
---|
280 | log.debug("(%s/%s) job left SGE queue" % ( galaxy_job_id, job_id ) ) |
---|
281 | self.work_queue.put( ( 'finish', sge_job_state ) ) |
---|
282 | continue |
---|
283 | except Exception, e: |
---|
284 | # so we don't kill the monitor thread |
---|
285 | log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) ) |
---|
286 | log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) ) |
---|
287 | sge_job_state.fail_message = "Cluster could not complete job" |
---|
288 | self.work_queue.put( ( 'fail', sge_job_state ) ) |
---|
289 | continue |
---|
290 | if state != old_state: |
---|
291 | log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, DRMAA_state[state] ) ) |
---|
292 | if state == DRMAA.Session.RUNNING and not sge_job_state.running: |
---|
293 | sge_job_state.running = True |
---|
294 | sge_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) |
---|
295 | if state in ( DRMAA.Session.DONE, DRMAA.Session.FAILED ): |
---|
296 | self.work_queue.put( ( 'finish', sge_job_state ) ) |
---|
297 | continue |
---|
298 | sge_job_state.old_state = state |
---|
299 | new_watched.append( sge_job_state ) |
---|
300 | # Replace the watch list with the updated version |
---|
301 | self.watched = new_watched |
---|
302 | |
---|
303 | def finish_job( self, sge_job_state ): |
---|
304 | """ |
---|
305 | Get the output/error for a finished job, pass to `job_wrapper.finish` |
---|
306 | and cleanup all the SGE temporary files. |
---|
307 | """ |
---|
308 | ofile = sge_job_state.ofile |
---|
309 | efile = sge_job_state.efile |
---|
310 | job_file = sge_job_state.job_file |
---|
311 | # collect the output |
---|
312 | try: |
---|
313 | ofh = file(ofile, "r") |
---|
314 | efh = file(efile, "r") |
---|
315 | stdout = ofh.read() |
---|
316 | stderr = efh.read() |
---|
317 | except: |
---|
318 | stdout = '' |
---|
319 | stderr = 'Job output not returned from cluster' |
---|
320 | log.debug(stderr) |
---|
321 | |
---|
322 | try: |
---|
323 | sge_job_state.job_wrapper.finish( stdout, stderr ) |
---|
324 | except: |
---|
325 | log.exception("Job wrapper finish method failed") |
---|
326 | |
---|
327 | # clean up the sge files |
---|
328 | self.cleanup( ( ofile, efile, job_file ) ) |
---|
329 | |
---|
330 | def fail_job( self, sge_job_state ): |
---|
331 | """ |
---|
332 | Seperated out so we can use the worker threads for it. |
---|
333 | """ |
---|
334 | self.stop_job( self.sa_session.query( self.app.model.Job ).get( sge_job_state.job_wrapper.job_id ) ) |
---|
335 | sge_job_state.job_wrapper.fail( sge_job_state.fail_message ) |
---|
336 | self.cleanup( ( sge_job_state.ofile, sge_job_state.efile, sge_job_state.job_file ) ) |
---|
337 | |
---|
338 | def cleanup( self, files ): |
---|
339 | if not asbool( self.app.config.get( 'debug', False ) ): |
---|
340 | for file in files: |
---|
341 | if os.access( file, os.R_OK ): |
---|
342 | os.unlink( file ) |
---|
343 | |
---|
344 | def put( self, job_wrapper ): |
---|
345 | """Add a job to the queue (by job identifier)""" |
---|
346 | # Change to queued state before handing to worker thread so the runner won't pick it up again |
---|
347 | job_wrapper.change_state( model.Job.states.QUEUED ) |
---|
348 | self.work_queue.put( ( 'queue', job_wrapper ) ) |
---|
349 | |
---|
350 | def shutdown( self ): |
---|
351 | """Attempts to gracefully shut down the monitor thread""" |
---|
352 | log.info( "sending stop signal to worker threads" ) |
---|
353 | self.monitor_queue.put( self.STOP_SIGNAL ) |
---|
354 | for i in range( len( self.work_threads ) ): |
---|
355 | self.work_queue.put( ( self.STOP_SIGNAL, None ) ) |
---|
356 | log.info( "sge job runner stopped" ) |
---|
357 | |
---|
358 | def stop_job( self, job ): |
---|
359 | """Attempts to delete a job from the SGE queue""" |
---|
360 | try: |
---|
361 | self.ds.control( job.job_runner_external_id, DRMAA.Session.TERMINATE ) |
---|
362 | log.debug( "(%s/%s) Removed from SGE queue at user's request" % ( job.id, job.job_runner_external_id ) ) |
---|
363 | except DRMAA.InvalidJobError: |
---|
364 | log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.id, job.job_runner_external_id ) ) |
---|
365 | |
---|
366 | def recover( self, job, job_wrapper ): |
---|
367 | """Recovers jobs stuck in the queued/running state when Galaxy started""" |
---|
368 | sge_job_state = SGEJobState() |
---|
369 | sge_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) |
---|
370 | sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) |
---|
371 | sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) |
---|
372 | sge_job_state.job_id = str( job.job_runner_external_id ) |
---|
373 | sge_job_state.runner_url = job_wrapper.tool.job_runner |
---|
374 | job_wrapper.command_line = job.command_line |
---|
375 | sge_job_state.job_wrapper = job_wrapper |
---|
376 | if job.state == model.Job.states.RUNNING: |
---|
377 | log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) |
---|
378 | sge_job_state.old_state = DRMAA.Session.RUNNING |
---|
379 | sge_job_state.running = True |
---|
380 | self.monitor_queue.put( sge_job_state ) |
---|
381 | elif job.state == model.Job.states.QUEUED: |
---|
382 | log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) |
---|
383 | sge_job_state.old_state = DRMAA.Session.QUEUED_ACTIVE |
---|
384 | sge_job_state.running = False |
---|
385 | self.monitor_queue.put( sge_job_state ) |
---|