1 | import os, logging, threading, time |
---|
2 | from datetime import timedelta |
---|
3 | from Queue import Queue, Empty |
---|
4 | |
---|
5 | from galaxy import model |
---|
6 | from galaxy.datatypes.data import nice_size |
---|
7 | from galaxy.util.bunch import Bunch |
---|
8 | |
---|
9 | from paste.deploy.converters import asbool |
---|
10 | |
---|
11 | import pkg_resources |
---|
12 | |
---|
13 | try: |
---|
14 | pkg_resources.require( "pbs_python" ) |
---|
15 | pbs = __import__( "pbs" ) |
---|
16 | except: |
---|
17 | pbs = None |
---|
18 | |
---|
19 | log = logging.getLogger( __name__ ) |
---|
20 | |
---|
21 | pbs_template = """#!/bin/sh |
---|
22 | GALAXY_LIB="%s" |
---|
23 | if [ "$GALAXY_LIB" != "None" ]; then |
---|
24 | if [ -n "$PYTHONPATH" ]; then |
---|
25 | export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" |
---|
26 | else |
---|
27 | export PYTHONPATH="$GALAXY_LIB" |
---|
28 | fi |
---|
29 | fi |
---|
30 | cd %s |
---|
31 | %s |
---|
32 | """ |
---|
33 | |
---|
34 | pbs_symlink_template = """#!/bin/sh |
---|
35 | GALAXY_LIB="%s" |
---|
36 | if [ "$GALAXY_LIB" != "None" ]; then |
---|
37 | if [ -n "$PYTHONPATH" ]; then |
---|
38 | export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" |
---|
39 | else |
---|
40 | export PYTHONPATH="$GALAXY_LIB" |
---|
41 | fi |
---|
42 | fi |
---|
43 | for dataset in %s; do |
---|
44 | dir=`dirname $dataset` |
---|
45 | file=`basename $dataset` |
---|
46 | [ ! -d $dir ] && mkdir -p $dir |
---|
47 | [ ! -e $dataset ] && ln -s %s/$file $dataset |
---|
48 | done |
---|
49 | cd %s |
---|
50 | %s |
---|
51 | """ |
---|
52 | |
---|
53 | # From pbs' job.h |
---|
54 | JOB_EXIT_STATUS = { |
---|
55 | 0: "job exec successful", |
---|
56 | -1: "job exec failed, before files, no retry", |
---|
57 | -2: "job exec failed, after files, no retry", |
---|
58 | -3: "job execution failed, do retry", |
---|
59 | -4: "job aborted on MOM initialization", |
---|
60 | -5: "job aborted on MOM init, chkpt, no migrate", |
---|
61 | -6: "job aborted on MOM init, chkpt, ok migrate", |
---|
62 | -7: "job restart failed", |
---|
63 | -8: "exec() of user command failed", |
---|
64 | } |
---|
65 | |
---|
66 | class PBSJobState( object ): |
---|
67 | def __init__( self ): |
---|
68 | """ |
---|
69 | Encapsulates state related to a job that is being run via PBS and |
---|
70 | that we need to monitor. |
---|
71 | """ |
---|
72 | self.job_wrapper = None |
---|
73 | self.job_id = None |
---|
74 | self.old_state = None |
---|
75 | self.running = False |
---|
76 | self.job_file = None |
---|
77 | self.ofile = None |
---|
78 | self.efile = None |
---|
79 | self.runner_url = None |
---|
80 | self.check_count = 0 |
---|
81 | self.stop_job = False |
---|
82 | |
---|
83 | class PBSJobRunner( object ): |
---|
84 | """ |
---|
85 | Job runner backed by a finite pool of worker threads. FIFO scheduling |
---|
86 | """ |
---|
87 | STOP_SIGNAL = object() |
---|
88 | def __init__( self, app ): |
---|
89 | """Initialize this job runner and start the monitor thread""" |
---|
90 | # Check if PBS was importable, fail if not |
---|
91 | if pbs is None: |
---|
92 | raise Exception( "PBSJobRunner requires pbs-python which was not found" ) |
---|
93 | if app.config.pbs_application_server and app.config.outputs_to_working_directory: |
---|
94 | raise Exception( "pbs_application_server (file staging) and outputs_to_working_directory options are mutually exclusive" ) |
---|
95 | self.app = app |
---|
96 | self.sa_session = app.model.context |
---|
97 | # 'watched' and 'queue' are both used to keep track of jobs to watch. |
---|
98 | # 'queue' is used to add new watched jobs, and can be called from |
---|
99 | # any thread (usually by the 'queue_job' method). 'watched' must only |
---|
100 | # be modified by the monitor thread, which will move items from 'queue' |
---|
101 | # to 'watched' and then manage the watched jobs. |
---|
102 | self.watched = [] |
---|
103 | self.monitor_queue = Queue() |
---|
104 | # set the default server during startup |
---|
105 | self.default_pbs_server = None |
---|
106 | self.determine_pbs_server( 'pbs:///' ) |
---|
107 | self.job_walltime = None |
---|
108 | if self.app.config.job_walltime is not None: |
---|
109 | h, m, s = [ int( v ) for v in self.app.config.job_walltime.split( ':' ) ] |
---|
110 | self.job_walltime = timedelta( 0, s, 0, 0, m, h ) |
---|
111 | self.monitor_thread = threading.Thread( target=self.monitor ) |
---|
112 | self.monitor_thread.start() |
---|
113 | self.work_queue = Queue() |
---|
114 | self.work_threads = [] |
---|
115 | nworkers = app.config.cluster_job_queue_workers |
---|
116 | for i in range( nworkers ): |
---|
117 | worker = threading.Thread( target=self.run_next ) |
---|
118 | worker.start() |
---|
119 | self.work_threads.append( worker ) |
---|
120 | log.debug( "%d workers ready" % nworkers ) |
---|
121 | |
---|
122 | def determine_pbs_server( self, url, rewrite = False ): |
---|
123 | """Determine what PBS server we are connecting to""" |
---|
124 | url_split = url.split("/") |
---|
125 | server = url_split[2] |
---|
126 | if server == "": |
---|
127 | if not self.default_pbs_server: |
---|
128 | self.default_pbs_server = pbs.pbs_default() |
---|
129 | log.debug( "Set default PBS server to %s" % self.default_pbs_server ) |
---|
130 | server = self.default_pbs_server |
---|
131 | url_split[2] = server |
---|
132 | if server is None: |
---|
133 | raise Exception( "Could not find torque server" ) |
---|
134 | if rewrite: |
---|
135 | return ( server, "/".join( url_split ) ) |
---|
136 | else: |
---|
137 | return server |
---|
138 | |
---|
139 | def determine_pbs_queue( self, url ): |
---|
140 | """Determine what PBS queue we are submitting to""" |
---|
141 | try: |
---|
142 | return url.split('/')[3] or None |
---|
143 | except: |
---|
144 | return None |
---|
145 | |
---|
146 | def determine_pbs_options( self, url ): |
---|
147 | try: |
---|
148 | opts = url.split('/')[4].strip().lstrip('-').split(' -') |
---|
149 | assert opts != [''] |
---|
150 | except: |
---|
151 | return [] |
---|
152 | rval = [] |
---|
153 | for opt in opts: |
---|
154 | name, value = opt.split( None, 1 ) |
---|
155 | if name == 'l': |
---|
156 | resource_attrs = value.split(',') |
---|
157 | for j, ( res, val ) in enumerate( [ a.split('=', 1) for a in resource_attrs ] ): |
---|
158 | rval.append( dict( name = pbs.ATTR_l, value = val, resource = res ) ) |
---|
159 | else: |
---|
160 | rval.append( dict( name = getattr( pbs, 'ATTR_' + name ), value = value ) ) |
---|
161 | return rval |
---|
162 | |
---|
163 | def run_next( self ): |
---|
164 | """ |
---|
165 | Run the next item in the queue (a job waiting to run or finish ) |
---|
166 | """ |
---|
167 | while 1: |
---|
168 | ( op, obj ) = self.work_queue.get() |
---|
169 | if op is self.STOP_SIGNAL: |
---|
170 | return |
---|
171 | try: |
---|
172 | if op == 'queue': |
---|
173 | self.queue_job( obj ) |
---|
174 | elif op == 'finish': |
---|
175 | self.finish_job( obj ) |
---|
176 | elif op == 'fail': |
---|
177 | self.fail_job( obj ) |
---|
178 | except: |
---|
179 | log.exception( "Uncaught exception %sing job" % op ) |
---|
180 | |
---|
181 | def queue_job( self, job_wrapper ): |
---|
182 | """Create PBS script for a job and submit it to the PBS queue""" |
---|
183 | |
---|
184 | try: |
---|
185 | job_wrapper.prepare() |
---|
186 | command_line = job_wrapper.get_command_line() |
---|
187 | except: |
---|
188 | job_wrapper.fail( "failure preparing job", exception=True ) |
---|
189 | log.exception("failure running job %d" % job_wrapper.job_id) |
---|
190 | return |
---|
191 | |
---|
192 | runner_url = job_wrapper.tool.job_runner |
---|
193 | |
---|
194 | # This is silly, why would we queue a job with no command line? |
---|
195 | if not command_line: |
---|
196 | job_wrapper.finish( '', '' ) |
---|
197 | return |
---|
198 | |
---|
199 | # Check for deletion before we change state |
---|
200 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
201 | log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id ) |
---|
202 | job_wrapper.cleanup() |
---|
203 | return |
---|
204 | |
---|
205 | ( pbs_server_name, runner_url ) = self.determine_pbs_server( runner_url, rewrite = True ) |
---|
206 | pbs_queue_name = self.determine_pbs_queue( runner_url ) |
---|
207 | pbs_options = self.determine_pbs_options( runner_url ) |
---|
208 | c = pbs.pbs_connect( pbs_server_name ) |
---|
209 | if c <= 0: |
---|
210 | errno, text = pbs.error() |
---|
211 | job_wrapper.fail( "Unable to queue job for execution. Resubmitting the job may succeed." ) |
---|
212 | log.error( "Connection to PBS server for submit failed: %s: %s" % ( errno, text ) ) |
---|
213 | return |
---|
214 | |
---|
215 | # define job attributes |
---|
216 | ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id) |
---|
217 | efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id) |
---|
218 | |
---|
219 | output_fnames = job_wrapper.get_output_fnames() |
---|
220 | |
---|
221 | # If an application server is set, we're staging |
---|
222 | if self.app.config.pbs_application_server: |
---|
223 | pbs_ofile = self.app.config.pbs_application_server + ':' + ofile |
---|
224 | pbs_efile = self.app.config.pbs_application_server + ':' + efile |
---|
225 | output_files = [ str( o ) for o in output_fnames ] |
---|
226 | stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True ) |
---|
227 | stageout = self.get_stage_in_out( output_files ) |
---|
228 | attrs = [ |
---|
229 | dict( name = pbs.ATTR_o, value = pbs_ofile ), |
---|
230 | dict( name = pbs.ATTR_e, value = pbs_efile ), |
---|
231 | dict( name = pbs.ATTR_stagein, value = stagein ), |
---|
232 | dict( name = pbs.ATTR_stageout, value = stageout ), |
---|
233 | ] |
---|
234 | # If not, we're using NFS |
---|
235 | else: |
---|
236 | attrs = [ |
---|
237 | dict( name = pbs.ATTR_o, value = ofile ), |
---|
238 | dict( name = pbs.ATTR_e, value = efile ), |
---|
239 | ] |
---|
240 | |
---|
241 | # define PBS job options |
---|
242 | attrs.append( dict( name = pbs.ATTR_N, value = str( "%s_%s_%s" % ( job_wrapper.job_id, job_wrapper.tool.id, job_wrapper.user ) ) ) ) |
---|
243 | job_attrs = pbs.new_attropl( len( attrs ) + len( pbs_options ) ) |
---|
244 | for i, attr in enumerate( attrs + pbs_options ): |
---|
245 | job_attrs[i].name = attr['name'] |
---|
246 | job_attrs[i].value = attr['value'] |
---|
247 | if 'resource' in attr: |
---|
248 | job_attrs[i].resource = attr['resource'] |
---|
249 | exec_dir = os.path.abspath( job_wrapper.working_directory ) |
---|
250 | |
---|
251 | # write the job script |
---|
252 | if self.app.config.pbs_stage_path != '': |
---|
253 | script = pbs_symlink_template % (job_wrapper.galaxy_lib_dir, " ".join(job_wrapper.get_input_fnames() + output_files), self.app.config.pbs_stage_path, exec_dir, command_line) |
---|
254 | else: |
---|
255 | script = pbs_template % ( job_wrapper.galaxy_lib_dir, exec_dir, command_line ) |
---|
256 | if self.app.config.set_metadata_externally: |
---|
257 | script += "cd %s\n" % os.path.abspath( os.getcwd() ) |
---|
258 | script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), |
---|
259 | tmp_dir = self.app.config.new_file_path, |
---|
260 | dataset_files_path = self.app.model.Dataset.file_path, |
---|
261 | output_fnames = output_fnames, |
---|
262 | set_extension = False, |
---|
263 | kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior |
---|
264 | job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id) |
---|
265 | fh = file(job_file, "w") |
---|
266 | fh.write(script) |
---|
267 | fh.close() |
---|
268 | |
---|
269 | # job was deleted while we were preparing it |
---|
270 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
271 | log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id ) |
---|
272 | pbs.pbs_disconnect(c) |
---|
273 | self.cleanup( ( ofile, efile, job_file ) ) |
---|
274 | job_wrapper.cleanup() |
---|
275 | return |
---|
276 | |
---|
277 | # submit |
---|
278 | galaxy_job_id = job_wrapper.job_id |
---|
279 | log.debug("(%s) submitting file %s" % ( galaxy_job_id, job_file ) ) |
---|
280 | log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) |
---|
281 | job_id = pbs.pbs_submit(c, job_attrs, job_file, pbs_queue_name, None) |
---|
282 | pbs.pbs_disconnect(c) |
---|
283 | |
---|
284 | # check to see if it submitted |
---|
285 | if not job_id: |
---|
286 | errno, text = pbs.error() |
---|
287 | log.debug( "(%s) pbs_submit failed, PBS error %d: %s" % (galaxy_job_id, errno, text) ) |
---|
288 | job_wrapper.fail( "Unable to run this job due to a cluster error" ) |
---|
289 | return |
---|
290 | |
---|
291 | if pbs_queue_name is None: |
---|
292 | log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) ) |
---|
293 | else: |
---|
294 | log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, pbs_queue_name, job_id) ) |
---|
295 | |
---|
296 | # store runner information for tracking if Galaxy restarts |
---|
297 | job_wrapper.set_runner( runner_url, job_id ) |
---|
298 | |
---|
299 | # Store PBS related state information for job |
---|
300 | pbs_job_state = PBSJobState() |
---|
301 | pbs_job_state.job_wrapper = job_wrapper |
---|
302 | pbs_job_state.job_id = job_id |
---|
303 | pbs_job_state.ofile = ofile |
---|
304 | pbs_job_state.efile = efile |
---|
305 | pbs_job_state.job_file = job_file |
---|
306 | pbs_job_state.old_state = 'N' |
---|
307 | pbs_job_state.running = False |
---|
308 | pbs_job_state.runner_url = runner_url |
---|
309 | |
---|
310 | # Add to our 'queue' of jobs to monitor |
---|
311 | self.monitor_queue.put( pbs_job_state ) |
---|
312 | |
---|
313 | def monitor( self ): |
---|
314 | """ |
---|
315 | Watches jobs currently in the PBS queue and deals with state changes |
---|
316 | (queued to running) and job completion |
---|
317 | """ |
---|
318 | while 1: |
---|
319 | # Take any new watched jobs and put them on the monitor list |
---|
320 | try: |
---|
321 | while 1: |
---|
322 | pbs_job_state = self.monitor_queue.get_nowait() |
---|
323 | if pbs_job_state is self.STOP_SIGNAL: |
---|
324 | # TODO: This is where any cleanup would occur |
---|
325 | return |
---|
326 | self.watched.append( pbs_job_state ) |
---|
327 | except Empty: |
---|
328 | pass |
---|
329 | # Iterate over the list of watched jobs and check state |
---|
330 | try: |
---|
331 | self.check_watched_items() |
---|
332 | except: |
---|
333 | log.exception( "Uncaught exception checking jobs" ) |
---|
334 | # Sleep a bit before the next state check |
---|
335 | time.sleep( 1 ) |
---|
336 | |
---|
337 | def check_watched_items( self ): |
---|
338 | """ |
---|
339 | Called by the monitor thread to look at each watched job and deal |
---|
340 | with state changes. |
---|
341 | """ |
---|
342 | new_watched = [] |
---|
343 | # reduce pbs load by batching status queries |
---|
344 | ( failures, statuses ) = self.check_all_jobs() |
---|
345 | for pbs_job_state in self.watched: |
---|
346 | job_id = pbs_job_state.job_id |
---|
347 | galaxy_job_id = pbs_job_state.job_wrapper.job_id |
---|
348 | old_state = pbs_job_state.old_state |
---|
349 | pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url ) |
---|
350 | if pbs_server_name in failures: |
---|
351 | log.debug( "(%s/%s) Skipping state check because PBS server connection failed" % ( galaxy_job_id, job_id ) ) |
---|
352 | new_watched.append( pbs_job_state ) |
---|
353 | continue |
---|
354 | try: |
---|
355 | status = statuses[job_id] |
---|
356 | except KeyError: |
---|
357 | try: |
---|
358 | # Recheck to make sure it wasn't a communication problem |
---|
359 | self.check_single_job( pbs_server_name, job_id ) |
---|
360 | log.warning( "(%s/%s) PBS job was not in state check list, but was found with individual state check" % ( galaxy_job_id, job_id ) ) |
---|
361 | new_watched.append( pbs_job_state ) |
---|
362 | except: |
---|
363 | errno, text = pbs.error() |
---|
364 | if errno == 15001: |
---|
365 | # 15001 == job not in queue |
---|
366 | log.debug("(%s/%s) PBS job has left queue" % (galaxy_job_id, job_id) ) |
---|
367 | self.work_queue.put( ( 'finish', pbs_job_state ) ) |
---|
368 | else: |
---|
369 | # Unhandled error, continue to monitor |
---|
370 | log.info("(%s/%s) PBS state check resulted in error (%d): %s" % (galaxy_job_id, job_id, errno, text) ) |
---|
371 | new_watched.append( pbs_job_state ) |
---|
372 | continue |
---|
373 | if status.job_state != old_state: |
---|
374 | log.debug("(%s/%s) PBS job state changed from %s to %s" % ( galaxy_job_id, job_id, old_state, status.job_state ) ) |
---|
375 | if status.job_state == "R" and not pbs_job_state.running: |
---|
376 | pbs_job_state.running = True |
---|
377 | pbs_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) |
---|
378 | if status.job_state == "R" and ( pbs_job_state.check_count % 20 ) == 0: |
---|
379 | # Every 20th time the job status is checked, do limit checks (if configured) |
---|
380 | if self.app.config.output_size_limit > 0: |
---|
381 | # Check the size of the job outputs |
---|
382 | fail = False |
---|
383 | for outfile, size in pbs_job_state.job_wrapper.check_output_sizes(): |
---|
384 | if size > self.app.config.output_size_limit: |
---|
385 | pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \ |
---|
386 | % nice_size( self.app.config.output_size_limit ) |
---|
387 | log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \ |
---|
388 | % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) |
---|
389 | pbs_job_state.stop_job = True |
---|
390 | self.work_queue.put( ( 'fail', pbs_job_state ) ) |
---|
391 | fail = True |
---|
392 | break |
---|
393 | if fail: |
---|
394 | continue |
---|
395 | if self.job_walltime is not None: |
---|
396 | # Check the job's execution time |
---|
397 | if status.get( 'resources_used', False ): |
---|
398 | # resources_used may not be in the status for new jobs |
---|
399 | h, m, s = [ int( i ) for i in status.resources_used.walltime.split( ':' ) ] |
---|
400 | time_executing = timedelta( 0, s, 0, 0, m, h ) |
---|
401 | if time_executing > self.job_walltime: |
---|
402 | pbs_job_state.fail_message = 'Job ran longer than maximum allowed execution time (%s), please try different job parameters or' \ |
---|
403 | % self.app.config.job_walltime |
---|
404 | log.warning( '(%s/%s) Dequeueing job since walltime has been reached' \ |
---|
405 | % ( galaxy_job_id, job_id ) ) |
---|
406 | pbs_job_state.stop_job = True |
---|
407 | self.work_queue.put( ( 'fail', pbs_job_state ) ) |
---|
408 | continue |
---|
409 | elif status.job_state == "C": |
---|
410 | # "keep_completed" is enabled in PBS, so try to check exit status |
---|
411 | try: |
---|
412 | assert int( status.exit_status ) == 0 |
---|
413 | log.debug("(%s/%s) PBS job has completed successfully" % ( galaxy_job_id, job_id ) ) |
---|
414 | except AssertionError: |
---|
415 | pbs_job_state.fail_message = 'Job cannot be completed due to a cluster error. Please retry or' |
---|
416 | log.error( '(%s/%s) PBS job failed: %s' % ( galaxy_job_id, job_id, JOB_EXIT_STATUS.get( int( status.exit_status ), 'Unknown error: %s' % status.exit_status ) ) ) |
---|
417 | self.work_queue.put( ( 'fail', pbs_job_state ) ) |
---|
418 | continue |
---|
419 | except AttributeError: |
---|
420 | # No exit_status, can't verify proper completion so we just have to assume success. |
---|
421 | log.debug("(%s/%s) PBS job has completed" % ( galaxy_job_id, job_id ) ) |
---|
422 | self.work_queue.put( ( 'finish', pbs_job_state ) ) |
---|
423 | continue |
---|
424 | pbs_job_state.old_state = status.job_state |
---|
425 | new_watched.append( pbs_job_state ) |
---|
426 | # Replace the watch list with the updated version |
---|
427 | self.watched = new_watched |
---|
428 | |
---|
429 | def check_all_jobs( self ): |
---|
430 | """ |
---|
431 | Returns a list of servers that failed to be contacted and a dict |
---|
432 | of "job_id : status" pairs (where status is a bunchified version |
---|
433 | of the API's structure. |
---|
434 | """ |
---|
435 | servers = [] |
---|
436 | failures = [] |
---|
437 | statuses = {} |
---|
438 | for pbs_job_state in self.watched: |
---|
439 | pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url ) |
---|
440 | if pbs_server_name not in servers: |
---|
441 | servers.append( pbs_server_name ) |
---|
442 | pbs_job_state.check_count += 1 |
---|
443 | for pbs_server_name in servers: |
---|
444 | c = pbs.pbs_connect( pbs_server_name ) |
---|
445 | if c <= 0: |
---|
446 | log.debug("connection to PBS server %s for state check failed" % pbs_server_name ) |
---|
447 | failures.append( pbs_server_name ) |
---|
448 | continue |
---|
449 | stat_attrl = pbs.new_attrl(3) |
---|
450 | stat_attrl[0].name = pbs.ATTR_state |
---|
451 | stat_attrl[1].name = pbs.ATTR_used |
---|
452 | stat_attrl[2].name = pbs.ATTR_exitstat |
---|
453 | jobs = pbs.pbs_statjob( c, None, stat_attrl, None ) |
---|
454 | pbs.pbs_disconnect( c ) |
---|
455 | statuses.update( self.convert_statjob_to_bunches( jobs ) ) |
---|
456 | return( ( failures, statuses ) ) |
---|
457 | |
---|
458 | def convert_statjob_to_bunches( self, statjob_out ): |
---|
459 | statuses = {} |
---|
460 | for job in statjob_out: |
---|
461 | status = {} |
---|
462 | for attrib in job.attribs: |
---|
463 | if attrib.resource is None: |
---|
464 | status[ attrib.name ] = attrib.value |
---|
465 | else: |
---|
466 | if attrib.name not in status: |
---|
467 | status[ attrib.name ] = Bunch() |
---|
468 | status[ attrib.name ][ attrib.resource ] = attrib.value |
---|
469 | statuses[ job.name ] = Bunch( **status ) |
---|
470 | return statuses |
---|
471 | |
---|
472 | def check_single_job( self, pbs_server_name, job_id ): |
---|
473 | """ |
---|
474 | Returns the state of a single job, used to make sure a job is |
---|
475 | really dead. |
---|
476 | """ |
---|
477 | c = pbs.pbs_connect( pbs_server_name ) |
---|
478 | if c <= 0: |
---|
479 | log.debug("connection to PBS server %s for state check failed" % pbs_server_name ) |
---|
480 | return None |
---|
481 | stat_attrl = pbs.new_attrl(1) |
---|
482 | stat_attrl[0].name = pbs.ATTR_state |
---|
483 | jobs = pbs.pbs_statjob( c, job_id, stat_attrl, None ) |
---|
484 | pbs.pbs_disconnect( c ) |
---|
485 | return jobs[0].attribs[0].value |
---|
486 | |
---|
487 | def finish_job( self, pbs_job_state ): |
---|
488 | """ |
---|
489 | Get the output/error for a finished job, pass to `job_wrapper.finish` |
---|
490 | and cleanup all the PBS temporary files. |
---|
491 | """ |
---|
492 | ofile = pbs_job_state.ofile |
---|
493 | efile = pbs_job_state.efile |
---|
494 | job_file = pbs_job_state.job_file |
---|
495 | # collect the output |
---|
496 | try: |
---|
497 | ofh = file(ofile, "r") |
---|
498 | efh = file(efile, "r") |
---|
499 | stdout = ofh.read() |
---|
500 | stderr = efh.read() |
---|
501 | except: |
---|
502 | stdout = '' |
---|
503 | stderr = 'Job output not returned by PBS: the output datasets were deleted while the job was running, the job was manually dequeued or there was a cluster error.' |
---|
504 | log.debug(stderr) |
---|
505 | |
---|
506 | try: |
---|
507 | pbs_job_state.job_wrapper.finish( stdout, stderr ) |
---|
508 | except: |
---|
509 | log.exception("Job wrapper finish method failed") |
---|
510 | pbs_job_state.job_wrapper.fail("Unable to finish job", exception=True) |
---|
511 | |
---|
512 | # clean up the pbs files |
---|
513 | self.cleanup( ( ofile, efile, job_file ) ) |
---|
514 | |
---|
515 | def fail_job( self, pbs_job_state ): |
---|
516 | """ |
---|
517 | Seperated out so we can use the worker threads for it. |
---|
518 | """ |
---|
519 | if pbs_job_state.stop_job: |
---|
520 | self.stop_job( self.sa_session.query( self.app.model.Job ).get( pbs_job_state.job_wrapper.job_id ) ) |
---|
521 | pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message ) |
---|
522 | self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) ) |
---|
523 | |
---|
524 | def cleanup( self, files ): |
---|
525 | if not asbool( self.app.config.get( 'debug', False ) ): |
---|
526 | for file in files: |
---|
527 | if os.access( file, os.R_OK ): |
---|
528 | os.unlink( file ) |
---|
529 | |
---|
530 | def put( self, job_wrapper ): |
---|
531 | """Add a job to the queue (by job identifier)""" |
---|
532 | # Change to queued state before handing to worker thread so the runner won't pick it up again |
---|
533 | job_wrapper.change_state( model.Job.states.QUEUED ) |
---|
534 | self.work_queue.put( ( 'queue', job_wrapper ) ) |
---|
535 | |
---|
536 | def shutdown( self ): |
---|
537 | """Attempts to gracefully shut down the monitor thread""" |
---|
538 | log.info( "sending stop signal to worker threads" ) |
---|
539 | self.monitor_queue.put( self.STOP_SIGNAL ) |
---|
540 | for i in range( len( self.work_threads ) ): |
---|
541 | self.work_queue.put( ( self.STOP_SIGNAL, None ) ) |
---|
542 | log.info( "pbs job runner stopped" ) |
---|
543 | |
---|
544 | def get_stage_in_out( self, fnames, symlink=False ): |
---|
545 | """Convenience function to create a stagein/stageout list""" |
---|
546 | stage = '' |
---|
547 | for fname in fnames: |
---|
548 | if os.access(fname, os.R_OK): |
---|
549 | if stage: |
---|
550 | stage += ',' |
---|
551 | # pathnames are now absolute |
---|
552 | if symlink and self.app.config.pbs_stage_path: |
---|
553 | stage_name = os.path.join(self.app.config.pbs_stage_path, os.path.split(fname)[1]) |
---|
554 | else: |
---|
555 | stage_name = fname |
---|
556 | stage += "%s@%s:%s" % (stage_name, self.app.config.pbs_dataset_server, fname) |
---|
557 | return stage |
---|
558 | |
---|
559 | def stop_job( self, job ): |
---|
560 | """Attempts to delete a job from the PBS queue""" |
---|
561 | pbs_server_name = self.determine_pbs_server( str( job.job_runner_name ) ) |
---|
562 | c = pbs.pbs_connect( pbs_server_name ) |
---|
563 | if c <= 0: |
---|
564 | log.debug("(%s/%s) Connection to PBS server for job delete failed" % ( job.id, job.job_runner_external_id ) ) |
---|
565 | return |
---|
566 | pbs.pbs_deljob( c, str( job.job_runner_external_id ), 'NULL' ) |
---|
567 | pbs.pbs_disconnect( c ) |
---|
568 | log.debug( "(%s/%s) Removed from PBS queue before job completion" % ( job.id, job.job_runner_external_id ) ) |
---|
569 | |
---|
570 | def recover( self, job, job_wrapper ): |
---|
571 | """Recovers jobs stuck in the queued/running state when Galaxy started""" |
---|
572 | pbs_job_state = PBSJobState() |
---|
573 | pbs_job_state.ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job.id) |
---|
574 | pbs_job_state.efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job.id) |
---|
575 | pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id) |
---|
576 | pbs_job_state.job_id = str( job.job_runner_external_id ) |
---|
577 | pbs_job_state.runner_url = job_wrapper.tool.job_runner |
---|
578 | job_wrapper.command_line = job.command_line |
---|
579 | pbs_job_state.job_wrapper = job_wrapper |
---|
580 | if job.state == model.Job.states.RUNNING: |
---|
581 | log.debug( "(%s/%s) is still in running state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) |
---|
582 | pbs_job_state.old_state = 'R' |
---|
583 | pbs_job_state.running = True |
---|
584 | self.monitor_queue.put( pbs_job_state ) |
---|
585 | elif job.state == model.Job.states.QUEUED: |
---|
586 | log.debug( "(%s/%s) is still in PBS queued state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) |
---|
587 | pbs_job_state.old_state = 'Q' |
---|
588 | pbs_job_state.running = False |
---|
589 | self.monitor_queue.put( pbs_job_state ) |
---|