[2] | 1 | import os, logging, threading, time |
---|
| 2 | from Queue import Queue, Empty |
---|
| 3 | |
---|
| 4 | from galaxy import model |
---|
| 5 | from paste.deploy.converters import asbool |
---|
| 6 | |
---|
| 7 | import pkg_resources |
---|
| 8 | |
---|
| 9 | try: |
---|
| 10 | pkg_resources.require( "drmaa" ) |
---|
| 11 | drmaa = __import__( "drmaa" ) |
---|
| 12 | except Exception, e: |
---|
| 13 | drmaa = str( e ) |
---|
| 14 | |
---|
| 15 | log = logging.getLogger( __name__ ) |
---|
| 16 | |
---|
| 17 | if type( drmaa ) != str: |
---|
| 18 | drmaa_state = { |
---|
| 19 | drmaa.JobState.UNDETERMINED: 'process status cannot be determined', |
---|
| 20 | drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', |
---|
| 21 | drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', |
---|
| 22 | drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', |
---|
| 23 | drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', |
---|
| 24 | drmaa.JobState.RUNNING: 'job is running', |
---|
| 25 | drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', |
---|
| 26 | drmaa.JobState.USER_SUSPENDED: 'job is user suspended', |
---|
| 27 | drmaa.JobState.DONE: 'job finished normally', |
---|
| 28 | drmaa.JobState.FAILED: 'job finished, but failed', |
---|
| 29 | } |
---|
| 30 | |
---|
| 31 | drm_template = """#!/bin/sh |
---|
| 32 | #$ -S /bin/sh |
---|
| 33 | GALAXY_LIB="%s" |
---|
| 34 | if [ "$GALAXY_LIB" != "None" ]; then |
---|
| 35 | if [ -n "$PYTHONPATH" ]; then |
---|
| 36 | PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" |
---|
| 37 | else |
---|
| 38 | PYTHONPATH="$GALAXY_LIB" |
---|
| 39 | fi |
---|
| 40 | export PYTHONPATH |
---|
| 41 | fi |
---|
| 42 | cd %s |
---|
| 43 | %s |
---|
| 44 | """ |
---|
| 45 | |
---|
| 46 | class DRMAAJobState( object ): |
---|
| 47 | def __init__( self ): |
---|
| 48 | """ |
---|
| 49 | Encapsulates state related to a job that is being run via the DRM and |
---|
| 50 | that we need to monitor. |
---|
| 51 | """ |
---|
| 52 | self.job_wrapper = None |
---|
| 53 | self.job_id = None |
---|
| 54 | self.old_state = None |
---|
| 55 | self.running = False |
---|
| 56 | self.job_file = None |
---|
| 57 | self.ofile = None |
---|
| 58 | self.efile = None |
---|
| 59 | self.runner_url = None |
---|
| 60 | |
---|
| 61 | class DRMAAJobRunner( object ): |
---|
| 62 | """ |
---|
| 63 | Job runner backed by a finite pool of worker threads. FIFO scheduling |
---|
| 64 | """ |
---|
| 65 | STOP_SIGNAL = object() |
---|
| 66 | def __init__( self, app ): |
---|
| 67 | """Initialize this job runner and start the monitor thread""" |
---|
| 68 | # Check if drmaa was importable, fail if not |
---|
| 69 | if type( drmaa ) == str: |
---|
| 70 | raise Exception( "DRMAAJobRunner requires drmaa module which could not be loaded: %s" % drmaa ) |
---|
| 71 | self.app = app |
---|
| 72 | self.sa_session = app.model.context |
---|
| 73 | # 'watched' and 'queue' are both used to keep track of jobs to watch. |
---|
| 74 | # 'queue' is used to add new watched jobs, and can be called from |
---|
| 75 | # any thread (usually by the 'queue_job' method). 'watched' must only |
---|
| 76 | # be modified by the monitor thread, which will move items from 'queue' |
---|
| 77 | # to 'watched' and then manage the watched jobs. |
---|
| 78 | self.watched = [] |
---|
| 79 | self.monitor_queue = Queue() |
---|
| 80 | self.ds = drmaa.Session() |
---|
| 81 | self.ds.initialize() |
---|
| 82 | self.monitor_thread = threading.Thread( target=self.monitor ) |
---|
| 83 | self.monitor_thread.start() |
---|
| 84 | self.work_queue = Queue() |
---|
| 85 | self.work_threads = [] |
---|
| 86 | nworkers = app.config.cluster_job_queue_workers |
---|
| 87 | for i in range( nworkers ): |
---|
| 88 | worker = threading.Thread( target=self.run_next ) |
---|
| 89 | worker.start() |
---|
| 90 | self.work_threads.append( worker ) |
---|
| 91 | log.debug( "%d workers ready" % nworkers ) |
---|
| 92 | |
---|
| 93 | def get_native_spec( self, url ): |
---|
| 94 | """Get any native DRM arguments specified by the site configuration""" |
---|
| 95 | try: |
---|
| 96 | return url.split('/')[2] or None |
---|
| 97 | except: |
---|
| 98 | return None |
---|
| 99 | |
---|
| 100 | def run_next( self ): |
---|
| 101 | """ |
---|
| 102 | Run the next item in the queue (a job waiting to run or finish ) |
---|
| 103 | """ |
---|
| 104 | while 1: |
---|
| 105 | ( op, obj ) = self.work_queue.get() |
---|
| 106 | if op is self.STOP_SIGNAL: |
---|
| 107 | return |
---|
| 108 | try: |
---|
| 109 | if op == 'queue': |
---|
| 110 | self.queue_job( obj ) |
---|
| 111 | elif op == 'finish': |
---|
| 112 | self.finish_job( obj ) |
---|
| 113 | elif op == 'fail': |
---|
| 114 | self.fail_job( obj ) |
---|
| 115 | except: |
---|
| 116 | log.exception( "Uncaught exception %sing job" % op ) |
---|
| 117 | |
---|
| 118 | def queue_job( self, job_wrapper ): |
---|
| 119 | """Create job script and submit it to the DRM""" |
---|
| 120 | |
---|
| 121 | try: |
---|
| 122 | job_wrapper.prepare() |
---|
| 123 | command_line = job_wrapper.get_command_line() |
---|
| 124 | except: |
---|
| 125 | job_wrapper.fail( "failure preparing job", exception=True ) |
---|
| 126 | log.exception("failure running job %d" % job_wrapper.job_id) |
---|
| 127 | return |
---|
| 128 | |
---|
| 129 | runner_url = job_wrapper.tool.job_runner |
---|
| 130 | |
---|
| 131 | # This is silly, why would we queue a job with no command line? |
---|
| 132 | if not command_line: |
---|
| 133 | job_wrapper.finish( '', '' ) |
---|
| 134 | return |
---|
| 135 | |
---|
| 136 | # Check for deletion before we change state |
---|
| 137 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
| 138 | log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.job_id ) |
---|
| 139 | job_wrapper.cleanup() |
---|
| 140 | return |
---|
| 141 | |
---|
| 142 | # Change to queued state immediately |
---|
| 143 | job_wrapper.change_state( model.Job.states.QUEUED ) |
---|
| 144 | |
---|
| 145 | # define job attributes |
---|
| 146 | ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id) |
---|
| 147 | efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id) |
---|
| 148 | jt = self.ds.createJobTemplate() |
---|
| 149 | jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id) |
---|
| 150 | jt.outputPath = ":%s" % ofile |
---|
| 151 | jt.errorPath = ":%s" % efile |
---|
| 152 | native_spec = self.get_native_spec( runner_url ) |
---|
| 153 | if native_spec is not None: |
---|
| 154 | jt.nativeSpecification = native_spec |
---|
| 155 | |
---|
| 156 | script = drm_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) |
---|
| 157 | if self.app.config.set_metadata_externally: |
---|
| 158 | script += "cd %s\n" % os.path.abspath( os.getcwd() ) |
---|
| 159 | script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), |
---|
| 160 | tmp_dir = self.app.config.new_file_path, |
---|
| 161 | dataset_files_path = self.app.model.Dataset.file_path, |
---|
| 162 | output_fnames = job_wrapper.get_output_fnames(), |
---|
| 163 | set_extension = False, |
---|
| 164 | kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior |
---|
| 165 | fh = file( jt.remoteCommand, "w" ) |
---|
| 166 | fh.write( script ) |
---|
| 167 | fh.close() |
---|
| 168 | os.chmod( jt.remoteCommand, 0750 ) |
---|
| 169 | |
---|
| 170 | # job was deleted while we were preparing it |
---|
| 171 | if job_wrapper.get_state() == model.Job.states.DELETED: |
---|
| 172 | log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.job_id ) |
---|
| 173 | self.cleanup( ( ofile, efile, jt.remoteCommand ) ) |
---|
| 174 | job_wrapper.cleanup() |
---|
| 175 | return |
---|
| 176 | |
---|
| 177 | galaxy_job_id = job_wrapper.job_id |
---|
| 178 | log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) ) |
---|
| 179 | log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) |
---|
| 180 | # runJob will raise if there's a submit problem |
---|
| 181 | job_id = self.ds.runJob(jt) |
---|
| 182 | log.info("(%s) queued as %s" % ( galaxy_job_id, job_id ) ) |
---|
| 183 | |
---|
| 184 | # store runner information for tracking if Galaxy restarts |
---|
| 185 | job_wrapper.set_runner( runner_url, job_id ) |
---|
| 186 | |
---|
| 187 | # Store DRM related state information for job |
---|
| 188 | drm_job_state = DRMAAJobState() |
---|
| 189 | drm_job_state.job_wrapper = job_wrapper |
---|
| 190 | drm_job_state.job_id = job_id |
---|
| 191 | drm_job_state.ofile = ofile |
---|
| 192 | drm_job_state.efile = efile |
---|
| 193 | drm_job_state.job_file = jt.remoteCommand |
---|
| 194 | drm_job_state.old_state = 'new' |
---|
| 195 | drm_job_state.running = False |
---|
| 196 | drm_job_state.runner_url = runner_url |
---|
| 197 | |
---|
| 198 | # delete the job template |
---|
| 199 | self.ds.deleteJobTemplate( jt ) |
---|
| 200 | |
---|
| 201 | # Add to our 'queue' of jobs to monitor |
---|
| 202 | self.monitor_queue.put( drm_job_state ) |
---|
| 203 | |
---|
| 204 | def monitor( self ): |
---|
| 205 | """ |
---|
| 206 | Watches jobs currently in the PBS queue and deals with state changes |
---|
| 207 | (queued to running) and job completion |
---|
| 208 | """ |
---|
| 209 | while 1: |
---|
| 210 | # Take any new watched jobs and put them on the monitor list |
---|
| 211 | try: |
---|
| 212 | while 1: |
---|
| 213 | drm_job_state = self.monitor_queue.get_nowait() |
---|
| 214 | if drm_job_state is self.STOP_SIGNAL: |
---|
| 215 | # TODO: This is where any cleanup would occur |
---|
| 216 | self.ds.exit() |
---|
| 217 | return |
---|
| 218 | self.watched.append( drm_job_state ) |
---|
| 219 | except Empty: |
---|
| 220 | pass |
---|
| 221 | # Iterate over the list of watched jobs and check state |
---|
| 222 | self.check_watched_items() |
---|
| 223 | # Sleep a bit before the next state check |
---|
| 224 | time.sleep( 1 ) |
---|
| 225 | |
---|
| 226 | def check_watched_items( self ): |
---|
| 227 | """ |
---|
| 228 | Called by the monitor thread to look at each watched job and deal |
---|
| 229 | with state changes. |
---|
| 230 | """ |
---|
| 231 | new_watched = [] |
---|
| 232 | for drm_job_state in self.watched: |
---|
| 233 | job_id = drm_job_state.job_id |
---|
| 234 | galaxy_job_id = drm_job_state.job_wrapper.job_id |
---|
| 235 | old_state = drm_job_state.old_state |
---|
| 236 | try: |
---|
| 237 | state = self.ds.jobStatus( job_id ) |
---|
| 238 | except drmaa.InvalidJobException: |
---|
| 239 | # we should only get here if an orphaned job was put into the queue at app startup |
---|
| 240 | log.debug("(%s/%s) job left DRM queue" % ( galaxy_job_id, job_id ) ) |
---|
| 241 | self.work_queue.put( ( 'finish', drm_job_state ) ) |
---|
| 242 | continue |
---|
| 243 | except Exception, e: |
---|
| 244 | # so we don't kill the monitor thread |
---|
| 245 | log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) ) |
---|
| 246 | log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) ) |
---|
| 247 | drm_job_state.fail_message = "Cluster could not complete job" |
---|
| 248 | self.work_queue.put( ( 'fail', drm_job_state ) ) |
---|
| 249 | continue |
---|
| 250 | if state != old_state: |
---|
| 251 | log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, drmaa_state[state] ) ) |
---|
| 252 | if state == drmaa.JobState.RUNNING and not drm_job_state.running: |
---|
| 253 | drm_job_state.running = True |
---|
| 254 | drm_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) |
---|
| 255 | if state in ( drmaa.JobState.DONE, drmaa.JobState.FAILED ): |
---|
| 256 | self.work_queue.put( ( 'finish', drm_job_state ) ) |
---|
| 257 | continue |
---|
| 258 | drm_job_state.old_state = state |
---|
| 259 | new_watched.append( drm_job_state ) |
---|
| 260 | # Replace the watch list with the updated version |
---|
| 261 | self.watched = new_watched |
---|
| 262 | |
---|
| 263 | def finish_job( self, drm_job_state ): |
---|
| 264 | """ |
---|
| 265 | Get the output/error for a finished job, pass to `job_wrapper.finish` |
---|
| 266 | and cleanup all the DRM temporary files. |
---|
| 267 | """ |
---|
| 268 | ofile = drm_job_state.ofile |
---|
| 269 | efile = drm_job_state.efile |
---|
| 270 | job_file = drm_job_state.job_file |
---|
| 271 | # collect the output |
---|
| 272 | try: |
---|
| 273 | ofh = file(ofile, "r") |
---|
| 274 | efh = file(efile, "r") |
---|
| 275 | stdout = ofh.read() |
---|
| 276 | stderr = efh.read() |
---|
| 277 | except: |
---|
| 278 | stdout = '' |
---|
| 279 | stderr = 'Job output not returned from cluster' |
---|
| 280 | log.debug(stderr) |
---|
| 281 | |
---|
| 282 | try: |
---|
| 283 | drm_job_state.job_wrapper.finish( stdout, stderr ) |
---|
| 284 | except: |
---|
| 285 | log.exception("Job wrapper finish method failed") |
---|
| 286 | |
---|
| 287 | # clean up the drm files |
---|
| 288 | self.cleanup( ( ofile, efile, job_file ) ) |
---|
| 289 | |
---|
| 290 | def fail_job( self, drm_job_state ): |
---|
| 291 | """ |
---|
| 292 | Seperated out so we can use the worker threads for it. |
---|
| 293 | """ |
---|
| 294 | self.stop_job( self.sa_session.query( self.app.model.Job ).get( drm_job_state.job_wrapper.job_id ) ) |
---|
| 295 | drm_job_state.job_wrapper.fail( drm_job_state.fail_message ) |
---|
| 296 | self.cleanup( ( drm_job_state.ofile, drm_job_state.efile, drm_job_state.job_file ) ) |
---|
| 297 | |
---|
| 298 | def cleanup( self, files ): |
---|
| 299 | if not asbool( self.app.config.get( 'debug', False ) ): |
---|
| 300 | for file in files: |
---|
| 301 | if os.access( file, os.R_OK ): |
---|
| 302 | os.unlink( file ) |
---|
| 303 | |
---|
| 304 | def put( self, job_wrapper ): |
---|
| 305 | """Add a job to the queue (by job identifier)""" |
---|
| 306 | # Change to queued state before handing to worker thread so the runner won't pick it up again |
---|
| 307 | job_wrapper.change_state( model.Job.states.QUEUED ) |
---|
| 308 | self.work_queue.put( ( 'queue', job_wrapper ) ) |
---|
| 309 | |
---|
| 310 | def shutdown( self ): |
---|
| 311 | """Attempts to gracefully shut down the monitor thread""" |
---|
| 312 | log.info( "sending stop signal to worker threads" ) |
---|
| 313 | self.monitor_queue.put( self.STOP_SIGNAL ) |
---|
| 314 | for i in range( len( self.work_threads ) ): |
---|
| 315 | self.work_queue.put( ( self.STOP_SIGNAL, None ) ) |
---|
| 316 | log.info( "drmaa job runner stopped" ) |
---|
| 317 | |
---|
| 318 | def stop_job( self, job ): |
---|
| 319 | """Attempts to delete a job from the DRM queue""" |
---|
| 320 | try: |
---|
| 321 | self.ds.control( job.job_runner_external_id, drmaa.JobControlAction.TERMINATE ) |
---|
| 322 | log.debug( "(%s/%s) Removed from DRM queue at user's request" % ( job.id, job.job_runner_external_id ) ) |
---|
| 323 | except drmaa.InvalidJobException: |
---|
| 324 | log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.id, job.job_runner_external_id ) ) |
---|
| 325 | except Exception, e: |
---|
| 326 | log.debug( "(%s/%s) User killed running job, but error encountered removing from DRM queue: %s" % ( job.id, job.job_runner_external_id, e ) ) |
---|
| 327 | |
---|
| 328 | def recover( self, job, job_wrapper ): |
---|
| 329 | """Recovers jobs stuck in the queued/running state when Galaxy started""" |
---|
| 330 | drm_job_state = DRMAAJobState() |
---|
| 331 | drm_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) |
---|
| 332 | drm_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) |
---|
| 333 | drm_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) |
---|
| 334 | drm_job_state.job_id = str( job.job_runner_external_id ) |
---|
| 335 | drm_job_state.runner_url = job_wrapper.tool.job_runner |
---|
| 336 | job_wrapper.command_line = job.command_line |
---|
| 337 | drm_job_state.job_wrapper = job_wrapper |
---|
| 338 | if job.state == model.Job.states.RUNNING: |
---|
| 339 | log.debug( "(%s/%s) is still in running state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) |
---|
| 340 | drm_job_state.old_state = drmaa.JobState.RUNNING |
---|
| 341 | drm_job_state.running = True |
---|
| 342 | self.monitor_queue.put( drm_job_state ) |
---|
| 343 | elif job.state == model.Job.states.QUEUED: |
---|
| 344 | log.debug( "(%s/%s) is still in DRM queued state, adding to the DRM queue" % ( job.id, job.job_runner_external_id ) ) |
---|
| 345 | drm_job_state.old_state = drmaa.JobState.QUEUED_ACTIVE |
---|
| 346 | drm_job_state.running = False |
---|
| 347 | self.monitor_queue.put( drm_job_state ) |
---|