| 1 | import os, logging, threading, time | 
|---|
| 2 | from Queue import Queue, Empty | 
|---|
| 3 |  | 
|---|
| 4 | from galaxy import model | 
|---|
| 5 | from paste.deploy.converters import asbool | 
|---|
| 6 |  | 
|---|
| 7 | import pkg_resources | 
|---|
| 8 |  | 
|---|
| 9 | try: | 
|---|
| 10 | pkg_resources.require( "DRMAA_python" ) | 
|---|
| 11 | DRMAA = __import__( "DRMAA" ) | 
|---|
| 12 | except: | 
|---|
| 13 | DRMAA = None | 
|---|
| 14 |  | 
|---|
| 15 | log = logging.getLogger( __name__ ) | 
|---|
| 16 |  | 
|---|
| 17 | if DRMAA is not None: | 
|---|
| 18 | DRMAA_state = { | 
|---|
| 19 | DRMAA.Session.UNDETERMINED: 'process status cannot be determined', | 
|---|
| 20 | DRMAA.Session.QUEUED_ACTIVE: 'job is queued and waiting to be scheduled', | 
|---|
| 21 | DRMAA.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold', | 
|---|
| 22 | DRMAA.Session.USER_ON_HOLD: 'job is queued and in user hold', | 
|---|
| 23 | DRMAA.Session.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', | 
|---|
| 24 | DRMAA.Session.RUNNING: 'job is running', | 
|---|
| 25 | DRMAA.Session.SYSTEM_SUSPENDED: 'job is system suspended', | 
|---|
| 26 | DRMAA.Session.USER_SUSPENDED: 'job is user suspended', | 
|---|
| 27 | DRMAA.Session.DONE: 'job finished normally', | 
|---|
| 28 | DRMAA.Session.FAILED: 'job finished, but failed', | 
|---|
| 29 | } | 
|---|
| 30 |  | 
|---|
| 31 | sge_template = """#!/bin/sh | 
|---|
| 32 | #$ -S /bin/sh | 
|---|
| 33 | GALAXY_LIB="%s" | 
|---|
| 34 | if [ "$GALAXY_LIB" != "None" ]; then | 
|---|
| 35 | if [ -n "$PYTHONPATH" ]; then | 
|---|
| 36 | PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" | 
|---|
| 37 | else | 
|---|
| 38 | PYTHONPATH="$GALAXY_LIB" | 
|---|
| 39 | fi | 
|---|
| 40 | export PYTHONPATH | 
|---|
| 41 | fi | 
|---|
| 42 | cd %s | 
|---|
| 43 | %s | 
|---|
| 44 | """ | 
|---|
| 45 |  | 
|---|
| 46 | class SGEJobState( object ): | 
|---|
| 47 | def __init__( self ): | 
|---|
| 48 | """ | 
|---|
| 49 | Encapsulates state related to a job that is being run via SGE and | 
|---|
| 50 | that we need to monitor. | 
|---|
| 51 | """ | 
|---|
| 52 | self.job_wrapper = None | 
|---|
| 53 | self.job_id = None | 
|---|
| 54 | self.old_state = None | 
|---|
| 55 | self.running = False | 
|---|
| 56 | self.job_file = None | 
|---|
| 57 | self.ofile = None | 
|---|
| 58 | self.efile = None | 
|---|
| 59 | self.runner_url = None | 
|---|
| 60 |  | 
|---|
| 61 | class SGEJobRunner( object ): | 
|---|
| 62 | """ | 
|---|
| 63 | Job runner backed by a finite pool of worker threads. FIFO scheduling | 
|---|
| 64 | """ | 
|---|
| 65 | STOP_SIGNAL = object() | 
|---|
| 66 | def __init__( self, app ): | 
|---|
| 67 | """Initialize this job runner and start the monitor thread""" | 
|---|
| 68 | # Check if SGE was importable, fail if not | 
|---|
| 69 | if DRMAA is None: | 
|---|
| 70 | raise Exception( "SGEJobRunner requires DRMAA_python which was not found" ) | 
|---|
| 71 | self.app = app | 
|---|
| 72 | self.sa_session = app.model.context | 
|---|
| 73 | # 'watched' and 'queue' are both used to keep track of jobs to watch. | 
|---|
| 74 | # 'queue' is used to add new watched jobs, and can be called from | 
|---|
| 75 | # any thread (usually by the 'queue_job' method). 'watched' must only | 
|---|
| 76 | # be modified by the monitor thread, which will move items from 'queue' | 
|---|
| 77 | # to 'watched' and then manage the watched jobs. | 
|---|
| 78 | self.watched = [] | 
|---|
| 79 | self.monitor_queue = Queue() | 
|---|
| 80 | self.default_cell = self.determine_sge_cell( self.app.config.default_cluster_job_runner ) | 
|---|
| 81 | self.ds = DRMAA.Session() | 
|---|
| 82 | self.ds.init( self.default_cell ) | 
|---|
| 83 | self.monitor_thread = threading.Thread( target=self.monitor ) | 
|---|
| 84 | self.monitor_thread.start() | 
|---|
| 85 | self.work_queue = Queue() | 
|---|
| 86 | self.work_threads = [] | 
|---|
| 87 | nworkers = app.config.cluster_job_queue_workers | 
|---|
| 88 | for i in range( nworkers ): | 
|---|
| 89 | worker = threading.Thread( target=self.run_next ) | 
|---|
| 90 | worker.start() | 
|---|
| 91 | self.work_threads.append( worker ) | 
|---|
| 92 | log.debug( "%d workers ready" % nworkers ) | 
|---|
| 93 |  | 
|---|
| 94 | def determine_sge_cell( self, url ): | 
|---|
| 95 | """Determine what SGE cell we are using""" | 
|---|
| 96 | url_split = url.split("/") | 
|---|
| 97 | if url_split[0] == 'sge:': | 
|---|
| 98 | return url_split[2] | 
|---|
| 99 | # this could happen if sge is started, but is not the default runner | 
|---|
| 100 | else: | 
|---|
| 101 | return '' | 
|---|
| 102 |  | 
|---|
| 103 | def determine_sge_queue( self, url ): | 
|---|
| 104 | """Determine what SGE queue we are submitting to""" | 
|---|
| 105 | try: | 
|---|
| 106 | return url.split('/')[3] or None | 
|---|
| 107 | except: | 
|---|
| 108 | return None | 
|---|
| 109 |  | 
|---|
| 110 | def determine_sge_project( self, url ): | 
|---|
| 111 | """Determine what SGE project we are submitting to""" | 
|---|
| 112 | try: | 
|---|
| 113 | return url.split('/')[4] or None | 
|---|
| 114 | except: | 
|---|
| 115 | return None | 
|---|
| 116 |  | 
|---|
| 117 | def determine_sge_tool_parameters( self, url ): | 
|---|
| 118 | """Determine what are the tool's specific paramters""" | 
|---|
| 119 | try: | 
|---|
| 120 | return url.split('/')[5] or None | 
|---|
| 121 | except: | 
|---|
| 122 | return None | 
|---|
| 123 |  | 
|---|
| 124 | def run_next( self ): | 
|---|
| 125 | """ | 
|---|
| 126 | Run the next item in the queue (a job waiting to run or finish ) | 
|---|
| 127 | """ | 
|---|
| 128 | while 1: | 
|---|
| 129 | ( op, obj ) = self.work_queue.get() | 
|---|
| 130 | if op is self.STOP_SIGNAL: | 
|---|
| 131 | return | 
|---|
| 132 | try: | 
|---|
| 133 | if op == 'queue': | 
|---|
| 134 | self.queue_job( obj ) | 
|---|
| 135 | elif op == 'finish': | 
|---|
| 136 | self.finish_job( obj ) | 
|---|
| 137 | elif op == 'fail': | 
|---|
| 138 | self.fail_job( obj ) | 
|---|
| 139 | except: | 
|---|
| 140 | log.exception( "Uncaught exception %sing job" % op ) | 
|---|
| 141 |  | 
|---|
| 142 | def queue_job( self, job_wrapper ): | 
|---|
| 143 | """Create SGE script for a job and submit it to the SGE queue""" | 
|---|
| 144 |  | 
|---|
| 145 | try: | 
|---|
| 146 | job_wrapper.prepare() | 
|---|
| 147 | command_line = job_wrapper.get_command_line() | 
|---|
| 148 | except: | 
|---|
| 149 | job_wrapper.fail( "failure preparing job", exception=True ) | 
|---|
| 150 | log.exception("failure running job %d" % job_wrapper.job_id) | 
|---|
| 151 | return | 
|---|
| 152 |  | 
|---|
| 153 | runner_url = job_wrapper.tool.job_runner | 
|---|
| 154 |  | 
|---|
| 155 | # This is silly, why would we queue a job with no command line? | 
|---|
| 156 | if not command_line: | 
|---|
| 157 | job_wrapper.finish( '', '' ) | 
|---|
| 158 | return | 
|---|
| 159 |  | 
|---|
| 160 | # Check for deletion before we change state | 
|---|
| 161 | if job_wrapper.get_state() == model.Job.states.DELETED: | 
|---|
| 162 | log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id ) | 
|---|
| 163 | job_wrapper.cleanup() | 
|---|
| 164 | return | 
|---|
| 165 |  | 
|---|
| 166 | # Change to queued state immediately | 
|---|
| 167 | job_wrapper.change_state( model.Job.states.QUEUED ) | 
|---|
| 168 |  | 
|---|
| 169 | if self.determine_sge_cell( runner_url ) != self.default_cell: | 
|---|
| 170 | # TODO: support multiple cells | 
|---|
| 171 | log.warning( "(%s) Using multiple SGE cells is not supported.  This job will be submitted to the default cell." % job_wrapper.job_id ) | 
|---|
| 172 | sge_queue_name = self.determine_sge_queue( runner_url ) | 
|---|
| 173 | sge_project_name = self.determine_sge_project( runner_url ) | 
|---|
| 174 | sge_extra_params = self.determine_sge_tool_parameters ( runner_url ) | 
|---|
| 175 |  | 
|---|
| 176 | # define job attributes | 
|---|
| 177 | ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id) | 
|---|
| 178 | efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id) | 
|---|
| 179 | jt = self.ds.createJobTemplate() | 
|---|
| 180 | jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id) | 
|---|
| 181 | jt.outputPath = ":%s" % ofile | 
|---|
| 182 | jt.errorPath = ":%s" % efile | 
|---|
| 183 | nativeSpec = [] | 
|---|
| 184 | if sge_queue_name is not None: | 
|---|
| 185 | nativeSpec.append( "-q '%s'" % sge_queue_name ) | 
|---|
| 186 | if sge_project_name is not None: | 
|---|
| 187 | nativeSpec.append( "-P '%s'" % sge_project_name) | 
|---|
| 188 | if sge_extra_params is not None: | 
|---|
| 189 | nativeSpec.append( sge_extra_params ) | 
|---|
| 190 | if len(nativeSpec)>0: | 
|---|
| 191 | jt.nativeSpecification = ' '.join(nativeSpec) | 
|---|
| 192 |  | 
|---|
| 193 | script = sge_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) | 
|---|
| 194 | if self.app.config.set_metadata_externally: | 
|---|
| 195 | script += "cd %s\n" % os.path.abspath( os.getcwd() ) | 
|---|
| 196 | script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ), | 
|---|
| 197 | tmp_dir = self.app.config.new_file_path, | 
|---|
| 198 | dataset_files_path = self.app.model.Dataset.file_path, | 
|---|
| 199 | output_fnames = job_wrapper.get_output_fnames(), | 
|---|
| 200 | set_extension = False, | 
|---|
| 201 | kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior | 
|---|
| 202 | fh = file( jt.remoteCommand, "w" ) | 
|---|
| 203 | fh.write( script ) | 
|---|
| 204 | fh.close() | 
|---|
| 205 | os.chmod( jt.remoteCommand, 0750 ) | 
|---|
| 206 |  | 
|---|
| 207 | # job was deleted while we were preparing it | 
|---|
| 208 | if job_wrapper.get_state() == model.Job.states.DELETED: | 
|---|
| 209 | log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id ) | 
|---|
| 210 | self.cleanup( ( ofile, efile, jt.remoteCommand ) ) | 
|---|
| 211 | job_wrapper.cleanup() | 
|---|
| 212 | return | 
|---|
| 213 |  | 
|---|
| 214 | galaxy_job_id = job_wrapper.job_id | 
|---|
| 215 | log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) ) | 
|---|
| 216 | log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) | 
|---|
| 217 | # runJob will raise if there's a submit problem | 
|---|
| 218 | job_id = self.ds.runJob(jt) | 
|---|
| 219 | if sge_queue_name is None: | 
|---|
| 220 | log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) ) | 
|---|
| 221 | else: | 
|---|
| 222 | log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, sge_queue_name, job_id) ) | 
|---|
| 223 |  | 
|---|
| 224 | # store runner information for tracking if Galaxy restarts | 
|---|
| 225 | job_wrapper.set_runner( runner_url, job_id ) | 
|---|
| 226 |  | 
|---|
| 227 | # Store SGE related state information for job | 
|---|
| 228 | sge_job_state = SGEJobState() | 
|---|
| 229 | sge_job_state.job_wrapper = job_wrapper | 
|---|
| 230 | sge_job_state.job_id = job_id | 
|---|
| 231 | sge_job_state.ofile = ofile | 
|---|
| 232 | sge_job_state.efile = efile | 
|---|
| 233 | sge_job_state.job_file = jt.remoteCommand | 
|---|
| 234 | sge_job_state.old_state = 'new' | 
|---|
| 235 | sge_job_state.running = False | 
|---|
| 236 | sge_job_state.runner_url = runner_url | 
|---|
| 237 |  | 
|---|
| 238 | # delete the job template | 
|---|
| 239 | self.ds.deleteJobTemplate( jt ) | 
|---|
| 240 |  | 
|---|
| 241 | # Add to our 'queue' of jobs to monitor | 
|---|
| 242 | self.monitor_queue.put( sge_job_state ) | 
|---|
| 243 |  | 
|---|
| 244 | def monitor( self ): | 
|---|
| 245 | """ | 
|---|
| 246 | Watches jobs currently in the PBS queue and deals with state changes | 
|---|
| 247 | (queued to running) and job completion | 
|---|
| 248 | """ | 
|---|
| 249 | while 1: | 
|---|
| 250 | # Take any new watched jobs and put them on the monitor list | 
|---|
| 251 | try: | 
|---|
| 252 | while 1: | 
|---|
| 253 | sge_job_state = self.monitor_queue.get_nowait() | 
|---|
| 254 | if sge_job_state is self.STOP_SIGNAL: | 
|---|
| 255 | # TODO: This is where any cleanup would occur | 
|---|
| 256 | self.ds.exit() | 
|---|
| 257 | return | 
|---|
| 258 | self.watched.append( sge_job_state ) | 
|---|
| 259 | except Empty: | 
|---|
| 260 | pass | 
|---|
| 261 | # Iterate over the list of watched jobs and check state | 
|---|
| 262 | self.check_watched_items() | 
|---|
| 263 | # Sleep a bit before the next state check | 
|---|
| 264 | time.sleep( 1 ) | 
|---|
| 265 |  | 
|---|
| 266 | def check_watched_items( self ): | 
|---|
| 267 | """ | 
|---|
| 268 | Called by the monitor thread to look at each watched job and deal | 
|---|
| 269 | with state changes. | 
|---|
| 270 | """ | 
|---|
| 271 | new_watched = [] | 
|---|
| 272 | for sge_job_state in self.watched: | 
|---|
| 273 | job_id = sge_job_state.job_id | 
|---|
| 274 | galaxy_job_id = sge_job_state.job_wrapper.job_id | 
|---|
| 275 | old_state = sge_job_state.old_state | 
|---|
| 276 | try: | 
|---|
| 277 | state = self.ds.getJobProgramStatus( job_id ) | 
|---|
| 278 | except DRMAA.InvalidJobError: | 
|---|
| 279 | # we should only get here if an orphaned job was put into the queue at app startup | 
|---|
| 280 | log.debug("(%s/%s) job left SGE queue" % ( galaxy_job_id, job_id ) ) | 
|---|
| 281 | self.work_queue.put( ( 'finish', sge_job_state ) ) | 
|---|
| 282 | continue | 
|---|
| 283 | except Exception, e: | 
|---|
| 284 | # so we don't kill the monitor thread | 
|---|
| 285 | log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) ) | 
|---|
| 286 | log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) ) | 
|---|
| 287 | sge_job_state.fail_message = "Cluster could not complete job" | 
|---|
| 288 | self.work_queue.put( ( 'fail', sge_job_state ) ) | 
|---|
| 289 | continue | 
|---|
| 290 | if state != old_state: | 
|---|
| 291 | log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, DRMAA_state[state] ) ) | 
|---|
| 292 | if state == DRMAA.Session.RUNNING and not sge_job_state.running: | 
|---|
| 293 | sge_job_state.running = True | 
|---|
| 294 | sge_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) | 
|---|
| 295 | if state in ( DRMAA.Session.DONE, DRMAA.Session.FAILED ): | 
|---|
| 296 | self.work_queue.put( ( 'finish', sge_job_state ) ) | 
|---|
| 297 | continue | 
|---|
| 298 | sge_job_state.old_state = state | 
|---|
| 299 | new_watched.append( sge_job_state ) | 
|---|
| 300 | # Replace the watch list with the updated version | 
|---|
| 301 | self.watched = new_watched | 
|---|
| 302 |  | 
|---|
| 303 | def finish_job( self, sge_job_state ): | 
|---|
| 304 | """ | 
|---|
| 305 | Get the output/error for a finished job, pass to `job_wrapper.finish` | 
|---|
| 306 | and cleanup all the SGE temporary files. | 
|---|
| 307 | """ | 
|---|
| 308 | ofile = sge_job_state.ofile | 
|---|
| 309 | efile = sge_job_state.efile | 
|---|
| 310 | job_file = sge_job_state.job_file | 
|---|
| 311 | # collect the output | 
|---|
| 312 | try: | 
|---|
| 313 | ofh = file(ofile, "r") | 
|---|
| 314 | efh = file(efile, "r") | 
|---|
| 315 | stdout = ofh.read() | 
|---|
| 316 | stderr = efh.read() | 
|---|
| 317 | except: | 
|---|
| 318 | stdout = '' | 
|---|
| 319 | stderr = 'Job output not returned from cluster' | 
|---|
| 320 | log.debug(stderr) | 
|---|
| 321 |  | 
|---|
| 322 | try: | 
|---|
| 323 | sge_job_state.job_wrapper.finish( stdout, stderr ) | 
|---|
| 324 | except: | 
|---|
| 325 | log.exception("Job wrapper finish method failed") | 
|---|
| 326 |  | 
|---|
| 327 | # clean up the sge files | 
|---|
| 328 | self.cleanup( ( ofile, efile, job_file ) ) | 
|---|
| 329 |  | 
|---|
| 330 | def fail_job( self, sge_job_state ): | 
|---|
| 331 | """ | 
|---|
| 332 | Seperated out so we can use the worker threads for it. | 
|---|
| 333 | """ | 
|---|
| 334 | self.stop_job( self.sa_session.query( self.app.model.Job ).get( sge_job_state.job_wrapper.job_id ) ) | 
|---|
| 335 | sge_job_state.job_wrapper.fail( sge_job_state.fail_message ) | 
|---|
| 336 | self.cleanup( ( sge_job_state.ofile, sge_job_state.efile, sge_job_state.job_file ) ) | 
|---|
| 337 |  | 
|---|
| 338 | def cleanup( self, files ): | 
|---|
| 339 | if not asbool( self.app.config.get( 'debug', False ) ): | 
|---|
| 340 | for file in files: | 
|---|
| 341 | if os.access( file, os.R_OK ): | 
|---|
| 342 | os.unlink( file ) | 
|---|
| 343 |  | 
|---|
| 344 | def put( self, job_wrapper ): | 
|---|
| 345 | """Add a job to the queue (by job identifier)""" | 
|---|
| 346 | # Change to queued state before handing to worker thread so the runner won't pick it up again | 
|---|
| 347 | job_wrapper.change_state( model.Job.states.QUEUED ) | 
|---|
| 348 | self.work_queue.put( ( 'queue', job_wrapper ) ) | 
|---|
| 349 |  | 
|---|
| 350 | def shutdown( self ): | 
|---|
| 351 | """Attempts to gracefully shut down the monitor thread""" | 
|---|
| 352 | log.info( "sending stop signal to worker threads" ) | 
|---|
| 353 | self.monitor_queue.put( self.STOP_SIGNAL ) | 
|---|
| 354 | for i in range( len( self.work_threads ) ): | 
|---|
| 355 | self.work_queue.put( ( self.STOP_SIGNAL, None ) ) | 
|---|
| 356 | log.info( "sge job runner stopped" ) | 
|---|
| 357 |  | 
|---|
| 358 | def stop_job( self, job ): | 
|---|
| 359 | """Attempts to delete a job from the SGE queue""" | 
|---|
| 360 | try: | 
|---|
| 361 | self.ds.control( job.job_runner_external_id, DRMAA.Session.TERMINATE ) | 
|---|
| 362 | log.debug( "(%s/%s) Removed from SGE queue at user's request" % ( job.id, job.job_runner_external_id ) ) | 
|---|
| 363 | except DRMAA.InvalidJobError: | 
|---|
| 364 | log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.id, job.job_runner_external_id ) ) | 
|---|
| 365 |  | 
|---|
| 366 | def recover( self, job, job_wrapper ): | 
|---|
| 367 | """Recovers jobs stuck in the queued/running state when Galaxy started""" | 
|---|
| 368 | sge_job_state = SGEJobState() | 
|---|
| 369 | sge_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) | 
|---|
| 370 | sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) | 
|---|
| 371 | sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) | 
|---|
| 372 | sge_job_state.job_id = str( job.job_runner_external_id ) | 
|---|
| 373 | sge_job_state.runner_url = job_wrapper.tool.job_runner | 
|---|
| 374 | job_wrapper.command_line = job.command_line | 
|---|
| 375 | sge_job_state.job_wrapper = job_wrapper | 
|---|
| 376 | if job.state == model.Job.states.RUNNING: | 
|---|
| 377 | log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) | 
|---|
| 378 | sge_job_state.old_state = DRMAA.Session.RUNNING | 
|---|
| 379 | sge_job_state.running = True | 
|---|
| 380 | self.monitor_queue.put( sge_job_state ) | 
|---|
| 381 | elif job.state == model.Job.states.QUEUED: | 
|---|
| 382 | log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) ) | 
|---|
| 383 | sge_job_state.old_state = DRMAA.Session.QUEUED_ACTIVE | 
|---|
| 384 | sge_job_state.running = False | 
|---|
| 385 | self.monitor_queue.put( sge_job_state ) | 
|---|