[2] | 1 | import os, logging, threading, time |
---|
| 2 | from datetime import timedelta |
---|
| 3 | from Queue import Queue, Empty |
---|
| 4 | |
---|
| 5 | from galaxy import model |
---|
| 6 | from galaxy.datatypes.data import nice_size |
---|
| 7 | from galaxy.util.bunch import Bunch |
---|
| 8 | |
---|
| 9 | from paste.deploy.converters import asbool |
---|
| 10 | |
---|
| 11 | import pkg_resources |
---|
| 12 | |
---|
| 13 | try: |
---|
| 14 | pkg_resources.require( "pbs_python" ) |
---|
| 15 | pbs = __import__( "pbs" ) |
---|
| 16 | except: |
---|
| 17 | pbs = None |
---|
| 18 | |
---|
| 19 | log = logging.getLogger( __name__ ) |
---|
| 20 | |
---|
| 21 | pbs_template = """#!/bin/sh |
---|
| 22 | GALAXY_LIB="%s" |
---|
| 23 | if [ "$GALAXY_LIB" != "None" ]; then |
---|
| 24 | if [ -n "$PYTHONPATH" ]; then |
---|
| 25 | export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" |
---|
| 26 | else |
---|
| 27 | export PYTHONPATH="$GALAXY_LIB" |
---|
| 28 | fi |
---|
| 29 | fi |
---|
| 30 | cd %s |
---|
| 31 | %s |
---|
| 32 | """ |
---|
| 33 | |
---|
| 34 | pbs_symlink_template = """#!/bin/sh |
---|
| 35 | GALAXY_LIB="%s" |
---|
| 36 | if [ "$GALAXY_LIB" != "None" ]; then |
---|
| 37 | if [ -n "$PYTHONPATH" ]; then |
---|
| 38 | export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" |
---|
| 39 | else |
---|
| 40 | export PYTHONPATH="$GALAXY_LIB" |
---|
| 41 | fi |
---|
| 42 | fi |
---|
| 43 | for dataset in %s; do |
---|
| 44 | dir=`dirname $dataset` |
---|
| 45 | file=`basename $dataset` |
---|
| 46 | [ ! -d $dir ] && mkdir -p $dir |
---|
| 47 | [ ! -e $dataset ] && ln -s %s/$file $dataset |
---|
| 48 | done |
---|
| 49 | cd %s |
---|
| 50 | %s |
---|
| 51 | """ |
---|
| 52 | |
---|
| 53 | # From pbs' job.h |
---|
| 54 | JOB_EXIT_STATUS = { |
---|
| 55 | 0: "job exec successful", |
---|
| 56 | -1: "job exec failed, before files, no retry", |
---|
| 57 | -2: "job exec failed, after files, no retry", |
---|
| 58 | -3: "job execution failed, do retry", |
---|
| 59 | -4: "job aborted on MOM initialization", |
---|
| 60 | -5: "job aborted on MOM init, chkpt, no migrate", |
---|
| 61 | -6: "job aborted on MOM init, chkpt, ok migrate", |
---|
| 62 | -7: "job restart failed", |
---|
| 63 | -8: "exec() of user command failed", |
---|
| 64 | } |
---|
| 65 | |
---|
| 66 | class PBSJobState( object ): |
---|
| 67 | def __init__( self ): |
---|
| 68 | """ |
---|
| 69 | Encapsulates state related to a job that is being run via PBS and |
---|
| 70 | that we need to monitor. |
---|
| 71 | """ |
---|
| 72 | self.job_wrapper = None |
---|
| 73 | self.job_id = None |
---|
| 74 | self.old_state = None |
---|
| 75 | self.running = False |
---|
| 76 | self.job_file = None |
---|
| 77 | self.ofile = None |
---|
| 78 | self.efile = None |
---|
| 79 | self.runner_url = None |
---|
| 80 | self.check_count = 0 |
---|
| 81 | self.stop_job = False |
---|
| 82 | |
---|
| 83 | class PBSJobRunner( object ): |
---|
| 84 | """ |
---|
| 85 | Job runner backed by a finite pool of worker threads. FIFO scheduling |
---|
| 86 | """ |
---|
| 87 | STOP_SIGNAL = object() |
---|
| 88 | def __init__( self, app ): |
---|
| 89 | """Initialize this job runner and start the monitor thread""" |
---|
| 90 | # Check if PBS was importable, fail if not |
---|
| 91 | if pbs is None: |
---|
| 92 | raise Exception( "PBSJobRunner requires pbs-python which was not found" ) |
---|
| 93 | if app.config.pbs_application_server and app.config.outputs_to_working_directory: |
---|
| 94 | raise Exception( "pbs_application_server (file staging) and outputs_to_working_directory options are mutually exclusive" ) |
---|
| 95 | self.app = app |
---|
| 96 | self.sa_session = app.model.context |
---|
| 97 | # 'watched' and 'queue' are both used to keep track of jobs to watch. |
---|
| 98 | # 'queue' is used to add new watched jobs, and can be called from |
---|
| 99 | # any thread (usually by the 'queue_job' method). 'watched' must only |
---|
| 100 | # be modified by the monitor thread, which will move items from 'queue' |
---|
| 101 | # to 'watched' and then manage the watched jobs. |
---|
| 102 | self.watched = [] |
---|
| 103 | self.monitor_queue = Queue() |
---|
| 104 | # set the default server during startup |
---|
| 105 | self.default_pbs_server = None |
---|
| 106 | self.determine_pbs_server( 'pbs:///' ) |
---|
| 107 | self.job_walltime = None |
---|
| 108 | if self.app.config.job_walltime is not None: |
---|
| 109 | h, m, s = [ int( v ) for v in self.app.config.job_walltime.split( ':' ) ] |
---|
| 110 | self.job_walltime = timedelta( 0, s, 0, 0, m, h ) |
---|
| 111 | self.monitor_thread = threading.Thread( target=self.monitor ) |
---|
| 112 | self.monitor_thread.start() |
---|
| 113 | self.work_queue = Queue() |
---|
| 114 | self.work_threads = [] |
---|
| 115 | nworkers = app.config.cluster_job_queue_workers |
---|
| 116 | for i in range( nworkers ): |
---|
| 117 | worker = threading.Thread( target=self.run_next ) |
---|
| 118 | worker.start() |
---|
| 119 | self.work_threads.append( worker ) |
---|
| 120 | log.debug( "%d workers ready" % nworkers ) |
---|
| 121 | |
---|
| 122 | def determine_pbs_server( self, url, rewrite = False ): |
---|
| 123 | """Determine what PBS server we are connecting to""" |
---|
| 124 | url_split = url.split("/") |
---|
| 125 | server = url_split[2] |
---|
| 126 | if server == "": |
---|
| 127 | if not self.default_pbs_server: |
---|
| 128 | self.default_pbs_server = pbs.pbs_default() |
---|
| 129 | log.debug( "Set default PBS server to %s" % self.default_pbs_server ) |
---|
| 130 | server = self.default_pbs_server |
---|
| 131 | url_split[2] = server |
---|
| 132 | if server is None: |
---|
| 133 | raise Exception( "Could not find torque server" ) |
---|
| 134 | if rewrite: |
---|
| 135 | return ( server, "/".join( url_split ) ) |
---|
| 136 | else: |
---|
| 137 | return server |
---|
| 138 | |
---|
| 139 | def determine_pbs_queue( self, url ): |
---|
| 140 | """Determine what PBS queue we are submitting to""" |
---|
| 141 | try: |
---|
| 142 | return url.split('/')[3] or None |
---|
| 143 | except: |
---|
| 144 | return None |
---|
| 145 | |
---|
| 146 | def determine_pbs_options( self, url ): |
---|
| 147 | try: |
---|
| 148 | opts = url.split('/')[4].strip().lstrip('-').split(' -') |
---|
| 149 | assert opts != [''] |
---|
| 150 | except: |
---|
| 151 | return [] |
---|
| 152 | rval = [] |
---|
| 153 | for opt in opts: |
---|
| 154 | name, value = opt.split( None, 1 ) |
---|
| 155 | if name == 'l': |
---|
| 156 | resource_attrs = value.split(',') |
---|
| 157 | for j, ( res, val ) in enumerate( [ a.split('=', 1) for a in resource_attrs ] ): |
---|
| 158 | rval.append( dict( name = pbs.ATTR_l, value = val, resource = res ) ) |
---|
| 159 | else: |
---|
| 160 | rval.append( dict( name = getattr( pbs, 'ATTR_' + name ), value = value ) ) |
---|
| 161 | return rval |
---|
| 162 | |
---|
| 163 | def run_next( self ): |
---|
| 164 | """ |
---|
| 165 | Run the next item in the queue (a job waiting to run or finish ) |
---|
| 166 | """ |
---|
| 167 | while 1: |
---|
| 168 | ( op, obj ) = self.work_queue.get() |
---|
| 169 | if op is self.STOP_SIGNAL: |
---|
| 170 | return |
---|
| 171 | try: |
---|
| 172 | if op == 'queue': |
---|
| 173 | self.queue_job( obj ) |
---|
| 174 | elif op == 'finish': |
---|
| 175 | self.finish_job( obj ) |
---|
| 176 | elif op == 'fail': |
---|
| 177 | self.fail_job( obj ) |
---|
| 178 | except: |
---|
| 179 | log.exception( "Uncaught exception %sing job" % op ) |
---|
| 180 | |
---|
| 181 | def queue_job( self, job_wrapper ): |
---|
| 182 | """Create PBS script for a job and submit it to the PBS queue""" |
---|
| 183 | |
---|
| 184 | try: |
---|
| 185 | job_wrapper.prepare() |
---|
| 186 | command_line = job_wrapper.get_command_line() |
---|
| 187 | except: |
---|
| 188 | job_wrapper.fail( "failure preparing job", exception=True ) |
---|
| 189 | log.exception("failure running job %d" % job_wrapper.job_id) |
---|
| 190 | return |
---|
| 191 | |
---|
| 192 | runner_url = job_wrapper.tool.job_runner |
---|
| 193 | |
---|
| 194 | # This is silly, why would we queue a job with no command line? |
---|
| 195 | if not command_line: |
---|
| 196 | job_wrapper.finish( '', '' ) |
---|
| 197 | return |
---|
| 198 | |
---|
| 199 | # Check for deletion before we change state |
---|
| 200 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
| 201 | log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id ) |
---|
| 202 | job_wrapper.cleanup() |
---|
| 203 | return |
---|
| 204 | |
---|
| 205 | ( pbs_server_name, runner_url ) = self.determine_pbs_server( runner_url, rewrite = True ) |
---|
| 206 | pbs_queue_name = self.determine_pbs_queue( runner_url ) |
---|
| 207 | pbs_options = self.determine_pbs_options( runner_url ) |
---|
| 208 | c = pbs.pbs_connect( pbs_server_name ) |
---|
| 209 | if c <= 0: |
---|
| 210 | errno, text = pbs.error() |
---|
| 211 | job_wrapper.fail( "Unable to queue job for execution. Resubmitting the job may succeed." ) |
---|
| 212 | log.error( "Connection to PBS server for submit failed: %s: %s" % ( errno, text ) ) |
---|
| 213 | return |
---|
| 214 | |
---|
| 215 | # define job attributes |
---|
| 216 | ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id) |
---|
| 217 | efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id) |
---|
| 218 | |
---|
| 219 | output_fnames = job_wrapper.get_output_fnames() |
---|
| 220 | |
---|
| 221 | # If an application server is set, we're staging |
---|
| 222 | if self.app.config.pbs_application_server: |
---|
| 223 | pbs_ofile = self.app.config.pbs_application_server + ':' + ofile |
---|
| 224 | pbs_efile = self.app.config.pbs_application_server + ':' + efile |
---|
| 225 | output_files = [ str( o ) for o in output_fnames ] |
---|
| 226 | stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True ) |
---|
| 227 | stageout = self.get_stage_in_out( output_files ) |
---|
| 228 | attrs = [ |
---|
| 229 | dict( name = pbs.ATTR_o, value = pbs_ofile ), |
---|
| 230 | dict( name = pbs.ATTR_e, value = pbs_efile ), |
---|
| 231 | dict( name = pbs.ATTR_stagein, value = stagein ), |
---|
| 232 | dict( name = pbs.ATTR_stageout, value = stageout ), |
---|
| 233 | ] |
---|
| 234 | # If not, we're using NFS |
---|
| 235 | else: |
---|
| 236 | attrs = [ |
---|
| 237 | dict( name = pbs.ATTR_o, value = ofile ), |
---|
| 238 | dict( name = pbs.ATTR_e, value = efile ), |
---|
| 239 | ] |
---|
| 240 | |
---|
| 241 | # define PBS job options |
---|
| 242 | attrs.append( dict( name = pbs.ATTR_N, value = str( "%s_%s_%s" % ( job_wrapper.job_id, job_wrapper.tool.id, job_wrapper.user ) ) ) ) |
---|
| 243 | job_attrs = pbs.new_attropl( len( attrs ) + len( pbs_options ) ) |
---|
| 244 | for i, attr in enumerate( attrs + pbs_options ): |
---|
| 245 | job_attrs[i].name = attr['name'] |
---|
| 246 | job_attrs[i].value = attr['value'] |
---|
| 247 | if 'resource' in attr: |
---|
| 248 | job_attrs[i].resource = attr['resource'] |
---|
| 249 | exec_dir = os.path.abspath( job_wrapper.working_directory ) |
---|
| 250 | |
---|
| 251 | # write the job script |
---|
| 252 | if self.app.config.pbs_stage_path != '': |
---|
| 253 | script = pbs_symlink_template % (job_wrapper.galaxy_lib_dir, " ".join(job_wrapper.get_input_fnames() + output_files), self.app.config.pbs_stage_path, exec_dir, command_line) |
---|
| 254 | else: |
---|
| 255 | script = pbs_template % ( job_wrapper.galaxy_lib_dir, exec_dir, command_line ) |
---|
| 256 | if self.app.config.set_metadata_externally: |
---|
| 257 | script += "cd %s\n" % os.path.abspath( os.getcwd() ) |
---|
| 258 | script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), |
---|
| 259 | tmp_dir = self.app.config.new_file_path, |
---|
| 260 | dataset_files_path = self.app.model.Dataset.file_path, |
---|
| 261 | output_fnames = output_fnames, |
---|
| 262 | set_extension = False, |
---|
| 263 | kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior |
---|
| 264 | job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id) |
---|
| 265 | fh = file(job_file, "w") |
---|
| 266 | fh.write(script) |
---|
| 267 | fh.close() |
---|
| 268 | |
---|
| 269 | # job was deleted while we were preparing it |
---|
| 270 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
| 271 | log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id ) |
---|
| 272 | pbs.pbs_disconnect(c) |
---|
| 273 | self.cleanup( ( ofile, efile, job_file ) ) |
---|
| 274 | job_wrapper.cleanup() |
---|
| 275 | return |
---|
| 276 | |
---|
| 277 | # submit |
---|
| 278 | galaxy_job_id = job_wrapper.job_id |
---|
| 279 | log.debug("(%s) submitting file %s" % ( galaxy_job_id, job_file ) ) |
---|
| 280 | log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) |
---|
| 281 | job_id = pbs.pbs_submit(c, job_attrs, job_file, pbs_queue_name, None) |
---|
| 282 | pbs.pbs_disconnect(c) |
---|
| 283 | |
---|
| 284 | # check to see if it submitted |
---|
| 285 | if not job_id: |
---|
| 286 | errno, text = pbs.error() |
---|
| 287 | log.debug( "(%s) pbs_submit failed, PBS error %d: %s" % (galaxy_job_id, errno, text) ) |
---|
| 288 | job_wrapper.fail( "Unable to run this job due to a cluster error" ) |
---|
| 289 | return |
---|
| 290 | |
---|
| 291 | if pbs_queue_name is None: |
---|
| 292 | log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) ) |
---|
| 293 | else: |
---|
| 294 | log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, pbs_queue_name, job_id) ) |
---|
| 295 | |
---|
| 296 | # store runner information for tracking if Galaxy restarts |
---|
| 297 | job_wrapper.set_runner( runner_url, job_id ) |
---|
| 298 | |
---|
| 299 | # Store PBS related state information for job |
---|
| 300 | pbs_job_state = PBSJobState() |
---|
| 301 | pbs_job_state.job_wrapper = job_wrapper |
---|
| 302 | pbs_job_state.job_id = job_id |
---|
| 303 | pbs_job_state.ofile = ofile |
---|
| 304 | pbs_job_state.efile = efile |
---|
| 305 | pbs_job_state.job_file = job_file |
---|
| 306 | pbs_job_state.old_state = 'N' |
---|
| 307 | pbs_job_state.running = False |
---|
| 308 | pbs_job_state.runner_url = runner_url |
---|
| 309 | |
---|
| 310 | # Add to our 'queue' of jobs to monitor |
---|
| 311 | self.monitor_queue.put( pbs_job_state ) |
---|
| 312 | |
---|
| 313 | def monitor( self ): |
---|
| 314 | """ |
---|
| 315 | Watches jobs currently in the PBS queue and deals with state changes |
---|
| 316 | (queued to running) and job completion |
---|
| 317 | """ |
---|
| 318 | while 1: |
---|
| 319 | # Take any new watched jobs and put them on the monitor list |
---|
| 320 | try: |
---|
| 321 | while 1: |
---|
| 322 | pbs_job_state = self.monitor_queue.get_nowait() |
---|
| 323 | if pbs_job_state is self.STOP_SIGNAL: |
---|
| 324 | # TODO: This is where any cleanup would occur |
---|
| 325 | return |
---|
| 326 | self.watched.append( pbs_job_state ) |
---|
| 327 | except Empty: |
---|
| 328 | pass |
---|
| 329 | # Iterate over the list of watched jobs and check state |
---|
| 330 | try: |
---|
| 331 | self.check_watched_items() |
---|
| 332 | except: |
---|
| 333 | log.exception( "Uncaught exception checking jobs" ) |
---|
| 334 | # Sleep a bit before the next state check |
---|
| 335 | time.sleep( 1 ) |
---|
| 336 | |
---|
| 337 | def check_watched_items( self ): |
---|
| 338 | """ |
---|
| 339 | Called by the monitor thread to look at each watched job and deal |
---|
| 340 | with state changes. |
---|
| 341 | """ |
---|
| 342 | new_watched = [] |
---|
| 343 | # reduce pbs load by batching status queries |
---|
| 344 | ( failures, statuses ) = self.check_all_jobs() |
---|
| 345 | for pbs_job_state in self.watched: |
---|
| 346 | job_id = pbs_job_state.job_id |
---|
| 347 | galaxy_job_id = pbs_job_state.job_wrapper.job_id |
---|
| 348 | old_state = pbs_job_state.old_state |
---|
| 349 | pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url ) |
---|
| 350 | if pbs_server_name in failures: |
---|
| 351 | log.debug( "(%s/%s) Skipping state check because PBS server connection failed" % ( galaxy_job_id, job_id ) ) |
---|
| 352 | new_watched.append( pbs_job_state ) |
---|
| 353 | continue |
---|
| 354 | try: |
---|
| 355 | status = statuses[job_id] |
---|
| 356 | except KeyError: |
---|
| 357 | try: |
---|
| 358 | # Recheck to make sure it wasn't a communication problem |
---|
| 359 | self.check_single_job( pbs_server_name, job_id ) |
---|
| 360 | log.warning( "(%s/%s) PBS job was not in state check list, but was found with individual state check" % ( galaxy_job_id, job_id ) ) |
---|
| 361 | new_watched.append( pbs_job_state ) |
---|
| 362 | except: |
---|
| 363 | errno, text = pbs.error() |
---|
| 364 | if errno == 15001: |
---|
| 365 | # 15001 == job not in queue |
---|
| 366 | log.debug("(%s/%s) PBS job has left queue" % (galaxy_job_id, job_id) ) |
---|
| 367 | self.work_queue.put( ( 'finish', pbs_job_state ) ) |
---|
| 368 | else: |
---|
| 369 | # Unhandled error, continue to monitor |
---|
| 370 | log.info("(%s/%s) PBS state check resulted in error (%d): %s" % (galaxy_job_id, job_id, errno, text) ) |
---|
| 371 | new_watched.append( pbs_job_state ) |
---|
| 372 | continue |
---|
| 373 | if status.job_state != old_state: |
---|
| 374 | log.debug("(%s/%s) PBS job state changed from %s to %s" % ( galaxy_job_id, job_id, old_state, status.job_state ) ) |
---|
| 375 | if status.job_state == "R" and not pbs_job_state.running: |
---|
| 376 | pbs_job_state.running = True |
---|
| 377 | pbs_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) |
---|
| 378 | if status.job_state == "R" and ( pbs_job_state.check_count % 20 ) == 0: |
---|
| 379 | # Every 20th time the job status is checked, do limit checks (if configured) |
---|
| 380 | if self.app.config.output_size_limit > 0: |
---|
| 381 | # Check the size of the job outputs |
---|
| 382 | fail = False |
---|
| 383 | for outfile, size in pbs_job_state.job_wrapper.check_output_sizes(): |
---|
| 384 | if size > self.app.config.output_size_limit: |
---|
| 385 | pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \ |
---|
| 386 | % nice_size( self.app.config.output_size_limit ) |
---|
| 387 | log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \ |
---|
| 388 | % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) ) |
---|
| 389 | pbs_job_state.stop_job = True |
---|
| 390 | self.work_queue.put( ( 'fail', pbs_job_state ) ) |
---|
| 391 | fail = True |
---|
| 392 | break |
---|
| 393 | if fail: |
---|
| 394 | continue |
---|
| 395 | if self.job_walltime is not None: |
---|
| 396 | # Check the job's execution time |
---|
| 397 | if status.get( 'resources_used', False ): |
---|
| 398 | # resources_used may not be in the status for new jobs |
---|
| 399 | h, m, s = [ int( i ) for i in status.resources_used.walltime.split( ':' ) ] |
---|
| 400 | time_executing = timedelta( 0, s, 0, 0, m, h ) |
---|
| 401 | if time_executing > self.job_walltime: |
---|
| 402 | pbs_job_state.fail_message = 'Job ran longer than maximum allowed execution time (%s), please try different job parameters or' \ |
---|
| 403 | % self.app.config.job_walltime |
---|
| 404 | log.warning( '(%s/%s) Dequeueing job since walltime has been reached' \ |
---|
| 405 | % ( galaxy_job_id, job_id ) ) |
---|
| 406 | pbs_job_state.stop_job = True |
---|
| 407 | self.work_queue.put( ( 'fail', pbs_job_state ) ) |
---|
| 408 | continue |
---|
| 409 | elif status.job_state == "C": |
---|
| 410 | # "keep_completed" is enabled in PBS, so try to check exit status |
---|
| 411 | try: |
---|
| 412 | assert int( status.exit_status ) == 0 |
---|
| 413 | log.debug("(%s/%s) PBS job has completed successfully" % ( galaxy_job_id, job_id ) ) |
---|
| 414 | except AssertionError: |
---|
| 415 | pbs_job_state.fail_message = 'Job cannot be completed due to a cluster error. Please retry or' |
---|
| 416 | log.error( '(%s/%s) PBS job failed: %s' % ( galaxy_job_id, job_id, JOB_EXIT_STATUS.get( int( status.exit_status ), 'Unknown error: %s' % status.exit_status ) ) ) |
---|
| 417 | self.work_queue.put( ( 'fail', pbs_job_state ) ) |
---|
| 418 | continue |
---|
| 419 | except AttributeError: |
---|
| 420 | # No exit_status, can't verify proper completion so we just have to assume success. |
---|
| 421 | log.debug("(%s/%s) PBS job has completed" % ( galaxy_job_id, job_id ) ) |
---|
| 422 | self.work_queue.put( ( 'finish', pbs_job_state ) ) |
---|
| 423 | continue |
---|
| 424 | pbs_job_state.old_state = status.job_state |
---|
| 425 | new_watched.append( pbs_job_state ) |
---|
| 426 | # Replace the watch list with the updated version |
---|
| 427 | self.watched = new_watched |
---|
| 428 | |
---|
| 429 | def check_all_jobs( self ): |
---|
| 430 | """ |
---|
| 431 | Returns a list of servers that failed to be contacted and a dict |
---|
| 432 | of "job_id : status" pairs (where status is a bunchified version |
---|
| 433 | of the API's structure. |
---|
| 434 | """ |
---|
| 435 | servers = [] |
---|
| 436 | failures = [] |
---|
| 437 | statuses = {} |
---|
| 438 | for pbs_job_state in self.watched: |
---|
| 439 | pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url ) |
---|
| 440 | if pbs_server_name not in servers: |
---|
| 441 | servers.append( pbs_server_name ) |
---|
| 442 | pbs_job_state.check_count += 1 |
---|
| 443 | for pbs_server_name in servers: |
---|
| 444 | c = pbs.pbs_connect( pbs_server_name ) |
---|
| 445 | if c <= 0: |
---|
| 446 | log.debug("connection to PBS server %s for state check failed" % pbs_server_name ) |
---|
| 447 | failures.append( pbs_server_name ) |
---|
| 448 | continue |
---|
| 449 | stat_attrl = pbs.new_attrl(3) |
---|
| 450 | stat_attrl[0].name = pbs.ATTR_state |
---|
| 451 | stat_attrl[1].name = pbs.ATTR_used |
---|
| 452 | stat_attrl[2].name = pbs.ATTR_exitstat |
---|
| 453 | jobs = pbs.pbs_statjob( c, None, stat_attrl, None ) |
---|
| 454 | pbs.pbs_disconnect( c ) |
---|
| 455 | statuses.update( self.convert_statjob_to_bunches( jobs ) ) |
---|
| 456 | return( ( failures, statuses ) ) |
---|
| 457 | |
---|
| 458 | def convert_statjob_to_bunches( self, statjob_out ): |
---|
| 459 | statuses = {} |
---|
| 460 | for job in statjob_out: |
---|
| 461 | status = {} |
---|
| 462 | for attrib in job.attribs: |
---|
| 463 | if attrib.resource is None: |
---|
| 464 | status[ attrib.name ] = attrib.value |
---|
| 465 | else: |
---|
| 466 | if attrib.name not in status: |
---|
| 467 | status[ attrib.name ] = Bunch() |
---|
| 468 | status[ attrib.name ][ attrib.resource ] = attrib.value |
---|
| 469 | statuses[ job.name ] = Bunch( **status ) |
---|
| 470 | return statuses |
---|
| 471 | |
---|
| 472 | def check_single_job( self, pbs_server_name, job_id ): |
---|
| 473 | """ |
---|
| 474 | Returns the state of a single job, used to make sure a job is |
---|
| 475 | really dead. |
---|
| 476 | """ |
---|
| 477 | c = pbs.pbs_connect( pbs_server_name ) |
---|
| 478 | if c <= 0: |
---|
| 479 | log.debug("connection to PBS server %s for state check failed" % pbs_server_name ) |
---|
| 480 | return None |
---|
| 481 | stat_attrl = pbs.new_attrl(1) |
---|
| 482 | stat_attrl[0].name = pbs.ATTR_state |
---|
| 483 | jobs = pbs.pbs_statjob( c, job_id, stat_attrl, None ) |
---|
| 484 | pbs.pbs_disconnect( c ) |
---|
| 485 | return jobs[0].attribs[0].value |
---|
| 486 | |
---|
| 487 | def finish_job( self, pbs_job_state ): |
---|
| 488 | """ |
---|
| 489 | Get the output/error for a finished job, pass to `job_wrapper.finish` |
---|
| 490 | and cleanup all the PBS temporary files. |
---|
| 491 | """ |
---|
| 492 | ofile = pbs_job_state.ofile |
---|
| 493 | efile = pbs_job_state.efile |
---|
| 494 | job_file = pbs_job_state.job_file |
---|
| 495 | # collect the output |
---|
| 496 | try: |
---|
| 497 | ofh = file(ofile, "r") |
---|
| 498 | efh = file(efile, "r") |
---|
| 499 | stdout = ofh.read() |
---|
| 500 | stderr = efh.read() |
---|
| 501 | except: |
---|
| 502 | stdout = '' |
---|
| 503 | stderr = 'Job output not returned by PBS: the output datasets were deleted while the job was running, the job was manually dequeued or there was a cluster error.' |
---|
| 504 | log.debug(stderr) |
---|
| 505 | |
---|
| 506 | try: |
---|
| 507 | pbs_job_state.job_wrapper.finish( stdout, stderr ) |
---|
| 508 | except: |
---|
| 509 | log.exception("Job wrapper finish method failed") |
---|
| 510 | pbs_job_state.job_wrapper.fail("Unable to finish job", exception=True) |
---|
| 511 | |
---|
| 512 | # clean up the pbs files |
---|
| 513 | self.cleanup( ( ofile, efile, job_file ) ) |
---|
| 514 | |
---|
| 515 | def fail_job( self, pbs_job_state ): |
---|
| 516 | """ |
---|
| 517 | Seperated out so we can use the worker threads for it. |
---|
| 518 | """ |
---|
| 519 | if pbs_job_state.stop_job: |
---|
| 520 | self.stop_job( self.sa_session.query( self.app.model.Job ).get( pbs_job_state.job_wrapper.job_id ) ) |
---|
| 521 | pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message ) |
---|
| 522 | self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) ) |
---|
| 523 | |
---|
| 524 | def cleanup( self, files ): |
---|
| 525 | if not asbool( self.app.config.get( 'debug', False ) ): |
---|
| 526 | for file in files: |
---|
| 527 | if os.access( file, os.R_OK ): |
---|
| 528 | os.unlink( file ) |
---|
| 529 | |
---|
| 530 | def put( self, job_wrapper ): |
---|
| 531 | """Add a job to the queue (by job identifier)""" |
---|
| 532 | # Change to queued state before handing to worker thread so the runner won't pick it up again |
---|
| 533 | job_wrapper.change_state( model.Job.states.QUEUED ) |
---|
| 534 | self.work_queue.put( ( 'queue', job_wrapper ) ) |
---|
| 535 | |
---|
| 536 | def shutdown( self ): |
---|
| 537 | """Attempts to gracefully shut down the monitor thread""" |
---|
| 538 | log.info( "sending stop signal to worker threads" ) |
---|
| 539 | self.monitor_queue.put( self.STOP_SIGNAL ) |
---|
| 540 | for i in range( len( self.work_threads ) ): |
---|
| 541 | self.work_queue.put( ( self.STOP_SIGNAL, None ) ) |
---|
| 542 | log.info( "pbs job runner stopped" ) |
---|
| 543 | |
---|
| 544 | def get_stage_in_out( self, fnames, symlink=False ): |
---|
| 545 | """Convenience function to create a stagein/stageout list""" |
---|
| 546 | stage = '' |
---|
| 547 | for fname in fnames: |
---|
| 548 | if os.access(fname, os.R_OK): |
---|
| 549 | if stage: |
---|
| 550 | stage += ',' |
---|
| 551 | # pathnames are now absolute |
---|
| 552 | if symlink and self.app.config.pbs_stage_path: |
---|
| 553 | stage_name = os.path.join(self.app.config.pbs_stage_path, os.path.split(fname)[1]) |
---|
| 554 | else: |
---|
| 555 | stage_name = fname |
---|
| 556 | stage += "%s@%s:%s" % (stage_name, self.app.config.pbs_dataset_server, fname) |
---|
| 557 | return stage |
---|
| 558 | |
---|
| 559 | def stop_job( self, job ): |
---|
| 560 | """Attempts to delete a job from the PBS queue""" |
---|
| 561 | pbs_server_name = self.determine_pbs_server( str( job.job_runner_name ) ) |
---|
| 562 | c = pbs.pbs_connect( pbs_server_name ) |
---|
| 563 | if c <= 0: |
---|
| 564 | log.debug("(%s/%s) Connection to PBS server for job delete failed" % ( job.id, job.job_runner_external_id ) ) |
---|
| 565 | return |
---|
| 566 | pbs.pbs_deljob( c, str( job.job_runner_external_id ), 'NULL' ) |
---|
| 567 | pbs.pbs_disconnect( c ) |
---|
| 568 | log.debug( "(%s/%s) Removed from PBS queue before job completion" % ( job.id, job.job_runner_external_id ) ) |
---|
| 569 | |
---|
| 570 | def recover( self, job, job_wrapper ): |
---|
| 571 | """Recovers jobs stuck in the queued/running state when Galaxy started""" |
---|
| 572 | pbs_job_state = PBSJobState() |
---|
| 573 | pbs_job_state.ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job.id) |
---|
| 574 | pbs_job_state.efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job.id) |
---|
| 575 | pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id) |
---|
| 576 | pbs_job_state.job_id = str( job.job_runner_external_id ) |
---|
| 577 | pbs_job_state.runner_url = job_wrapper.tool.job_runner |
---|
| 578 | job_wrapper.command_line = job.command_line |
---|
| 579 | pbs_job_state.job_wrapper = job_wrapper |
---|
| 580 | if job.state == model.Job.states.RUNNING: |
---|
| 581 | log.debug( "(%s/%s) is still in running state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) |
---|
| 582 | pbs_job_state.old_state = 'R' |
---|
| 583 | pbs_job_state.running = True |
---|
| 584 | self.monitor_queue.put( pbs_job_state ) |
---|
| 585 | elif job.state == model.Job.states.QUEUED: |
---|
| 586 | log.debug( "(%s/%s) is still in PBS queued state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) |
---|
| 587 | pbs_job_state.old_state = 'Q' |
---|
| 588 | pbs_job_state.running = False |
---|
| 589 | self.monitor_queue.put( pbs_job_state ) |
---|