| 1 | import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil |
|---|
| 2 | |
|---|
| 3 | import galaxy |
|---|
| 4 | from galaxy import util, model |
|---|
| 5 | from galaxy.model.orm import lazyload |
|---|
| 6 | from galaxy.datatypes.tabular import * |
|---|
| 7 | from galaxy.datatypes.interval import * |
|---|
| 8 | from galaxy.datatypes import metadata |
|---|
| 9 | from galaxy.util.json import from_json_string |
|---|
| 10 | from galaxy.util.expressions import ExpressionContext |
|---|
| 11 | from galaxy.jobs.actions.post import ActionBox |
|---|
| 12 | |
|---|
| 13 | import pkg_resources |
|---|
| 14 | pkg_resources.require( "PasteDeploy" ) |
|---|
| 15 | |
|---|
| 16 | from paste.deploy.converters import asbool |
|---|
| 17 | |
|---|
| 18 | from Queue import Queue, Empty |
|---|
| 19 | |
|---|
| 20 | log = logging.getLogger( __name__ ) |
|---|
| 21 | |
|---|
| 22 | # States for running a job. These are NOT the same as data states |
|---|
| 23 | JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted' |
|---|
| 24 | |
|---|
| 25 | # This file, if created in the job's working directory, will be used for |
|---|
| 26 | # setting advanced metadata properties on the job and its associated outputs. |
|---|
| 27 | # This interface is currently experimental, is only used by the upload tool, |
|---|
| 28 | # and should eventually become API'd |
|---|
| 29 | TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json' |
|---|
| 30 | |
|---|
| 31 | class JobManager( object ): |
|---|
| 32 | """ |
|---|
| 33 | Highest level interface to job management. |
|---|
| 34 | |
|---|
| 35 | TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly. |
|---|
| 36 | This should be decoupled. |
|---|
| 37 | """ |
|---|
| 38 | def __init__( self, app ): |
|---|
| 39 | self.app = app |
|---|
| 40 | if self.app.config.get_bool( "enable_job_running", True ): |
|---|
| 41 | # The dispatcher launches the underlying job runners |
|---|
| 42 | self.dispatcher = DefaultJobDispatcher( app ) |
|---|
| 43 | # Queues for starting and stopping jobs |
|---|
| 44 | self.job_queue = JobQueue( app, self.dispatcher ) |
|---|
| 45 | self.job_stop_queue = JobStopQueue( app, self.dispatcher ) |
|---|
| 46 | else: |
|---|
| 47 | self.job_queue = self.job_stop_queue = NoopQueue() |
|---|
| 48 | def shutdown( self ): |
|---|
| 49 | self.job_queue.shutdown() |
|---|
| 50 | self.job_stop_queue.shutdown() |
|---|
| 51 | |
|---|
| 52 | class Sleeper( object ): |
|---|
| 53 | """ |
|---|
| 54 | Provides a 'sleep' method that sleeps for a number of seconds *unless* |
|---|
| 55 | the notify method is called (from a different thread). |
|---|
| 56 | """ |
|---|
| 57 | def __init__( self ): |
|---|
| 58 | self.condition = threading.Condition() |
|---|
| 59 | def sleep( self, seconds ): |
|---|
| 60 | self.condition.acquire() |
|---|
| 61 | self.condition.wait( seconds ) |
|---|
| 62 | self.condition.release() |
|---|
| 63 | def wake( self ): |
|---|
| 64 | self.condition.acquire() |
|---|
| 65 | self.condition.notify() |
|---|
| 66 | self.condition.release() |
|---|
| 67 | |
|---|
| 68 | class JobQueue( object ): |
|---|
| 69 | """ |
|---|
| 70 | Job manager, waits for jobs to be runnable and then dispatches to |
|---|
| 71 | a JobRunner. |
|---|
| 72 | """ |
|---|
| 73 | STOP_SIGNAL = object() |
|---|
| 74 | def __init__( self, app, dispatcher ): |
|---|
| 75 | """Start the job manager""" |
|---|
| 76 | self.app = app |
|---|
| 77 | self.sa_session = app.model.context |
|---|
| 78 | self.job_lock = False |
|---|
| 79 | # Should we read jobs form the database, or use an in memory queue |
|---|
| 80 | self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False ) |
|---|
| 81 | # Keep track of the pid that started the job manager, only it |
|---|
| 82 | # has valid threads |
|---|
| 83 | self.parent_pid = os.getpid() |
|---|
| 84 | # Contains new jobs. Note this is not used if track_jobs_in_database is True |
|---|
| 85 | self.queue = Queue() |
|---|
| 86 | |
|---|
| 87 | # Contains jobs that are waiting (only use from monitor thread) |
|---|
| 88 | ## This and jobs_to_check[] are closest to a "Job Queue" |
|---|
| 89 | self.waiting_jobs = [] |
|---|
| 90 | |
|---|
| 91 | # Helper for interruptable sleep |
|---|
| 92 | self.sleeper = Sleeper() |
|---|
| 93 | self.running = True |
|---|
| 94 | self.dispatcher = dispatcher |
|---|
| 95 | self.monitor_thread = threading.Thread( target=self.__monitor ) |
|---|
| 96 | self.monitor_thread.start() |
|---|
| 97 | log.info( "job manager started" ) |
|---|
| 98 | if app.config.get_bool( 'enable_job_recovery', True ): |
|---|
| 99 | self.__check_jobs_at_startup() |
|---|
| 100 | |
|---|
| 101 | def __check_jobs_at_startup( self ): |
|---|
| 102 | """ |
|---|
| 103 | Checks all jobs that are in the 'new', 'queued' or 'running' state in |
|---|
| 104 | the database and requeues or cleans up as necessary. Only run as the |
|---|
| 105 | job manager starts. |
|---|
| 106 | """ |
|---|
| 107 | model = self.app.model |
|---|
| 108 | for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ): |
|---|
| 109 | if job.tool_id not in self.app.toolbox.tools_by_id: |
|---|
| 110 | log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) |
|---|
| 111 | JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) |
|---|
| 112 | else: |
|---|
| 113 | log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id ) |
|---|
| 114 | self.queue.put( ( job.id, job.tool_id ) ) |
|---|
| 115 | for job in self.sa_session.query( model.Job ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ): |
|---|
| 116 | if job.tool_id not in self.app.toolbox.tools_by_id: |
|---|
| 117 | log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) |
|---|
| 118 | JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) |
|---|
| 119 | elif job.job_runner_name is None: |
|---|
| 120 | log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id ) |
|---|
| 121 | if self.track_jobs_in_database: |
|---|
| 122 | job.state = model.Job.states.NEW |
|---|
| 123 | else: |
|---|
| 124 | self.queue.put( ( job.id, job.tool_id ) ) |
|---|
| 125 | else: |
|---|
| 126 | job_wrapper = JobWrapper( job, self ) |
|---|
| 127 | self.dispatcher.recover( job, job_wrapper ) |
|---|
| 128 | if self.sa_session.dirty: |
|---|
| 129 | self.sa_session.flush() |
|---|
| 130 | |
|---|
| 131 | def __monitor( self ): |
|---|
| 132 | """ |
|---|
| 133 | Continually iterate the waiting jobs, checking is each is ready to |
|---|
| 134 | run and dispatching if so. |
|---|
| 135 | """ |
|---|
| 136 | # HACK: Delay until after forking, we need a way to do post fork notification!!! |
|---|
| 137 | time.sleep( 10 ) |
|---|
| 138 | while self.running: |
|---|
| 139 | try: |
|---|
| 140 | self.__monitor_step() |
|---|
| 141 | except: |
|---|
| 142 | log.exception( "Exception in monitor_step" ) |
|---|
| 143 | # Sleep |
|---|
| 144 | self.sleeper.sleep( 1 ) |
|---|
| 145 | |
|---|
| 146 | def __monitor_step( self ): |
|---|
| 147 | """ |
|---|
| 148 | Called repeatedly by `monitor` to process waiting jobs. Gets any new |
|---|
| 149 | jobs (either from the database or from its own queue), then iterates |
|---|
| 150 | over all new and waiting jobs to check the state of the jobs each |
|---|
| 151 | depends on. If the job has dependencies that have not finished, it |
|---|
| 152 | it goes to the waiting queue. If the job has dependencies with errors, |
|---|
| 153 | it is marked as having errors and removed from the queue. Otherwise, |
|---|
| 154 | the job is dispatched. |
|---|
| 155 | """ |
|---|
| 156 | # Pull all new jobs from the queue at once |
|---|
| 157 | jobs_to_check = [] |
|---|
| 158 | if self.track_jobs_in_database: |
|---|
| 159 | # Clear the session so we get fresh states for job and all datasets |
|---|
| 160 | self.sa_session.expunge_all() |
|---|
| 161 | # Fetch all new jobs |
|---|
| 162 | jobs_to_check = self.sa_session.query( model.Job ) \ |
|---|
| 163 | .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ |
|---|
| 164 | .filter( model.Job.state == model.Job.states.NEW ).all() |
|---|
| 165 | else: |
|---|
| 166 | try: |
|---|
| 167 | while 1: |
|---|
| 168 | message = self.queue.get_nowait() |
|---|
| 169 | if message is self.STOP_SIGNAL: |
|---|
| 170 | return |
|---|
| 171 | # Unpack the message |
|---|
| 172 | job_id, tool_id = message |
|---|
| 173 | # Get the job object and append to watch queue |
|---|
| 174 | jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) |
|---|
| 175 | except Empty: |
|---|
| 176 | pass |
|---|
| 177 | # Get job objects and append to watch queue for any which were |
|---|
| 178 | # previously waiting |
|---|
| 179 | for job_id in self.waiting_jobs: |
|---|
| 180 | jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) |
|---|
| 181 | # Iterate over new and waiting jobs and look for any that are |
|---|
| 182 | # ready to run |
|---|
| 183 | new_waiting_jobs = [] |
|---|
| 184 | for job in jobs_to_check: |
|---|
| 185 | try: |
|---|
| 186 | # Check the job's dependencies, requeue if they're not done |
|---|
| 187 | job_state = self.__check_if_ready_to_run( job ) |
|---|
| 188 | if job_state == JOB_WAIT: |
|---|
| 189 | if not self.track_jobs_in_database: |
|---|
| 190 | new_waiting_jobs.append( job.id ) |
|---|
| 191 | elif job_state == JOB_INPUT_ERROR: |
|---|
| 192 | log.info( "job %d unable to run: one or more inputs in error state" % job.id ) |
|---|
| 193 | elif job_state == JOB_INPUT_DELETED: |
|---|
| 194 | log.info( "job %d unable to run: one or more inputs deleted" % job.id ) |
|---|
| 195 | elif job_state == JOB_READY: |
|---|
| 196 | if self.job_lock: |
|---|
| 197 | log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id ) |
|---|
| 198 | if not self.track_jobs_in_database: |
|---|
| 199 | new_waiting_jobs.append( job.id ) |
|---|
| 200 | else: |
|---|
| 201 | self.dispatcher.put( JobWrapper( job, self ) ) |
|---|
| 202 | log.info( "job %d dispatched" % job.id ) |
|---|
| 203 | elif job_state == JOB_DELETED: |
|---|
| 204 | log.info( "job %d deleted by user while still queued" % job.id ) |
|---|
| 205 | elif job_state == JOB_ADMIN_DELETED: |
|---|
| 206 | job.info( "job %d deleted by admin while still queued" % job.id ) |
|---|
| 207 | else: |
|---|
| 208 | log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) ) |
|---|
| 209 | if not self.track_jobs_in_database: |
|---|
| 210 | new_waiting_jobs.append( job.id ) |
|---|
| 211 | except Exception, e: |
|---|
| 212 | log.exception( "failure running job %d" % job.id ) |
|---|
| 213 | # Update the waiting list |
|---|
| 214 | self.waiting_jobs = new_waiting_jobs |
|---|
| 215 | # Done with the session |
|---|
| 216 | self.sa_session.remove() |
|---|
| 217 | |
|---|
| 218 | def __check_if_ready_to_run( self, job ): |
|---|
| 219 | """ |
|---|
| 220 | Check if a job is ready to run by verifying that each of its input |
|---|
| 221 | datasets is ready (specifically in the OK state). If any input dataset |
|---|
| 222 | has an error, fail the job and return JOB_INPUT_ERROR. If any input |
|---|
| 223 | dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all |
|---|
| 224 | input datasets are in OK state, return JOB_READY indicating that the |
|---|
| 225 | job can be dispatched. Otherwise, return JOB_WAIT indicating that input |
|---|
| 226 | datasets are still being prepared. |
|---|
| 227 | """ |
|---|
| 228 | if job.state == model.Job.states.DELETED: |
|---|
| 229 | return JOB_DELETED |
|---|
| 230 | elif job.state == model.Job.states.ERROR: |
|---|
| 231 | return JOB_ADMIN_DELETED |
|---|
| 232 | for dataset_assoc in job.input_datasets: |
|---|
| 233 | idata = dataset_assoc.dataset |
|---|
| 234 | if not idata: |
|---|
| 235 | continue |
|---|
| 236 | # don't run jobs for which the input dataset was deleted |
|---|
| 237 | if idata.deleted: |
|---|
| 238 | JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) |
|---|
| 239 | return JOB_INPUT_DELETED |
|---|
| 240 | # an error in the input data causes us to bail immediately |
|---|
| 241 | elif idata.state == idata.states.ERROR: |
|---|
| 242 | JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) ) |
|---|
| 243 | return JOB_INPUT_ERROR |
|---|
| 244 | elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): |
|---|
| 245 | # need to requeue |
|---|
| 246 | return JOB_WAIT |
|---|
| 247 | return JOB_READY |
|---|
| 248 | |
|---|
| 249 | def put( self, job_id, tool ): |
|---|
| 250 | """Add a job to the queue (by job identifier)""" |
|---|
| 251 | if not self.track_jobs_in_database: |
|---|
| 252 | self.queue.put( ( job_id, tool.id ) ) |
|---|
| 253 | self.sleeper.wake() |
|---|
| 254 | |
|---|
| 255 | def shutdown( self ): |
|---|
| 256 | """Attempts to gracefully shut down the worker thread""" |
|---|
| 257 | if self.parent_pid != os.getpid(): |
|---|
| 258 | # We're not the real job queue, do nothing |
|---|
| 259 | return |
|---|
| 260 | else: |
|---|
| 261 | log.info( "sending stop signal to worker thread" ) |
|---|
| 262 | self.running = False |
|---|
| 263 | if not self.track_jobs_in_database: |
|---|
| 264 | self.queue.put( self.STOP_SIGNAL ) |
|---|
| 265 | self.sleeper.wake() |
|---|
| 266 | log.info( "job queue stopped" ) |
|---|
| 267 | self.dispatcher.shutdown() |
|---|
| 268 | |
|---|
| 269 | class JobWrapper( object ): |
|---|
| 270 | """ |
|---|
| 271 | Wraps a 'model.Job' with convience methods for running processes and |
|---|
| 272 | state management. |
|---|
| 273 | """ |
|---|
| 274 | def __init__( self, job, queue ): |
|---|
| 275 | self.job_id = job.id |
|---|
| 276 | self.session_id = job.session_id |
|---|
| 277 | self.user_id = job.user_id |
|---|
| 278 | self.tool = queue.app.toolbox.tools_by_id.get( job.tool_id, None ) |
|---|
| 279 | self.queue = queue |
|---|
| 280 | self.app = queue.app |
|---|
| 281 | self.sa_session = self.app.model.context |
|---|
| 282 | self.extra_filenames = [] |
|---|
| 283 | self.command_line = None |
|---|
| 284 | self.galaxy_lib_dir = None |
|---|
| 285 | # With job outputs in the working directory, we need the working |
|---|
| 286 | # directory to be set before prepare is run, or else premature deletion |
|---|
| 287 | # and job recovery fail. |
|---|
| 288 | self.working_directory = \ |
|---|
| 289 | os.path.join( self.app.config.job_working_directory, str( self.job_id ) ) |
|---|
| 290 | self.output_paths = None |
|---|
| 291 | self.tool_provided_job_metadata = None |
|---|
| 292 | # Wrapper holding the info required to restore and clean up from files used for setting metadata externally |
|---|
| 293 | self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) |
|---|
| 294 | |
|---|
| 295 | def get_param_dict( self ): |
|---|
| 296 | """ |
|---|
| 297 | Restore the dictionary of parameters from the database. |
|---|
| 298 | """ |
|---|
| 299 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 300 | param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) |
|---|
| 301 | param_dict = self.tool.params_from_strings( param_dict, self.app ) |
|---|
| 302 | return param_dict |
|---|
| 303 | |
|---|
| 304 | def prepare( self ): |
|---|
| 305 | """ |
|---|
| 306 | Prepare the job to run by creating the working directory and the |
|---|
| 307 | config files. |
|---|
| 308 | """ |
|---|
| 309 | self.sa_session.expunge_all() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner |
|---|
| 310 | if not os.path.exists( self.working_directory ): |
|---|
| 311 | os.mkdir( self.working_directory ) |
|---|
| 312 | # Restore parameters from the database |
|---|
| 313 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 314 | if job.user is None and job.galaxy_session is None: |
|---|
| 315 | raise Exception( 'Job %s has no user and no session.' % job.id ) |
|---|
| 316 | incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] ) |
|---|
| 317 | incoming = self.tool.params_from_strings( incoming, self.app ) |
|---|
| 318 | # Do any validation that could not be done at job creation |
|---|
| 319 | self.tool.handle_unvalidated_param_values( incoming, self.app ) |
|---|
| 320 | # Restore input / output data lists |
|---|
| 321 | inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) |
|---|
| 322 | out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) |
|---|
| 323 | out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) |
|---|
| 324 | |
|---|
| 325 | # Set up output dataset association for export history jobs. Because job |
|---|
| 326 | # uses a Dataset rather than an HDA or LDA, it's necessary to set up a |
|---|
| 327 | # fake dataset association that provides the needed attributes for |
|---|
| 328 | # preparing a job. |
|---|
| 329 | class FakeDatasetAssociation ( object ): |
|---|
| 330 | def __init__( self, dataset=None ): |
|---|
| 331 | self.dataset = dataset |
|---|
| 332 | self.file_name = dataset.file_name |
|---|
| 333 | self.metadata = dict() |
|---|
| 334 | self.children = [] |
|---|
| 335 | jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first() |
|---|
| 336 | if jeha: |
|---|
| 337 | out_data[ "output_file" ] = FakeDatasetAssociation( dataset=jeha.dataset ) |
|---|
| 338 | # These can be passed on the command line if wanted as $userId $userEmail |
|---|
| 339 | if job.history and job.history.user: # check for anonymous user! |
|---|
| 340 | userId = '%d' % job.history.user.id |
|---|
| 341 | userEmail = str(job.history.user.email) |
|---|
| 342 | else: |
|---|
| 343 | userId = 'Anonymous' |
|---|
| 344 | userEmail = 'Anonymous' |
|---|
| 345 | incoming['userId'] = userId |
|---|
| 346 | incoming['userEmail'] = userEmail |
|---|
| 347 | # Build params, done before hook so hook can use |
|---|
| 348 | param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) |
|---|
| 349 | # Certain tools require tasks to be completed prior to job execution |
|---|
| 350 | # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). |
|---|
| 351 | self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) |
|---|
| 352 | # Run the before queue ("exec_before_job") hook |
|---|
| 353 | self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, |
|---|
| 354 | out_data=out_data, tool=self.tool, param_dict=incoming) |
|---|
| 355 | self.sa_session.flush() |
|---|
| 356 | # Build any required config files |
|---|
| 357 | config_filenames = self.tool.build_config_files( param_dict, self.working_directory ) |
|---|
| 358 | # FIXME: Build the param file (might return None, DEPRECATED) |
|---|
| 359 | param_filename = self.tool.build_param_file( param_dict, self.working_directory ) |
|---|
| 360 | # Build the job's command line |
|---|
| 361 | self.command_line = self.tool.build_command_line( param_dict ) |
|---|
| 362 | # FIXME: for now, tools get Galaxy's lib dir in their path |
|---|
| 363 | if self.command_line and self.command_line.startswith( 'python' ): |
|---|
| 364 | self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root |
|---|
| 365 | # Shell fragment to inject dependencies |
|---|
| 366 | if self.app.config.use_tool_dependencies: |
|---|
| 367 | self.dependency_shell_commands = self.tool.build_dependency_shell_commands() |
|---|
| 368 | else: |
|---|
| 369 | self.dependency_shell_commands = None |
|---|
| 370 | # We need command_line persisted to the db in order for Galaxy to re-queue the job |
|---|
| 371 | # if the server was stopped and restarted before the job finished |
|---|
| 372 | job.command_line = self.command_line |
|---|
| 373 | self.sa_session.add( job ) |
|---|
| 374 | self.sa_session.flush() |
|---|
| 375 | # Return list of all extra files |
|---|
| 376 | extra_filenames = config_filenames |
|---|
| 377 | if param_filename is not None: |
|---|
| 378 | extra_filenames.append( param_filename ) |
|---|
| 379 | self.param_dict = param_dict |
|---|
| 380 | self.extra_filenames = extra_filenames |
|---|
| 381 | return extra_filenames |
|---|
| 382 | |
|---|
| 383 | def fail( self, message, exception=False ): |
|---|
| 384 | """ |
|---|
| 385 | Indicate job failure by setting state and message on all output |
|---|
| 386 | datasets. |
|---|
| 387 | """ |
|---|
| 388 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 389 | self.sa_session.refresh( job ) |
|---|
| 390 | # if the job was deleted, don't fail it |
|---|
| 391 | if not job.state == model.Job.states.DELETED: |
|---|
| 392 | # Check if the failure is due to an exception |
|---|
| 393 | if exception: |
|---|
| 394 | # Save the traceback immediately in case we generate another |
|---|
| 395 | # below |
|---|
| 396 | job.traceback = traceback.format_exc() |
|---|
| 397 | # Get the exception and let the tool attempt to generate |
|---|
| 398 | # a better message |
|---|
| 399 | etype, evalue, tb = sys.exc_info() |
|---|
| 400 | m = self.tool.handle_job_failure_exception( evalue ) |
|---|
| 401 | if m: |
|---|
| 402 | message = m |
|---|
| 403 | if self.app.config.outputs_to_working_directory: |
|---|
| 404 | for dataset_path in self.get_output_fnames(): |
|---|
| 405 | try: |
|---|
| 406 | shutil.move( dataset_path.false_path, dataset_path.real_path ) |
|---|
| 407 | log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) |
|---|
| 408 | except ( IOError, OSError ), e: |
|---|
| 409 | log.error( "fail(): Missing output file in working directory: %s" % e ) |
|---|
| 410 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
|---|
| 411 | dataset = dataset_assoc.dataset |
|---|
| 412 | self.sa_session.refresh( dataset ) |
|---|
| 413 | dataset.state = dataset.states.ERROR |
|---|
| 414 | dataset.blurb = 'tool error' |
|---|
| 415 | dataset.info = message |
|---|
| 416 | dataset.set_size() |
|---|
| 417 | if dataset.ext == 'auto': |
|---|
| 418 | dataset.extension = 'data' |
|---|
| 419 | self.sa_session.add( dataset ) |
|---|
| 420 | self.sa_session.flush() |
|---|
| 421 | job.state = model.Job.states.ERROR |
|---|
| 422 | job.command_line = self.command_line |
|---|
| 423 | job.info = message |
|---|
| 424 | self.sa_session.add( job ) |
|---|
| 425 | self.sa_session.flush() |
|---|
| 426 | # If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up |
|---|
| 427 | if self.tool: |
|---|
| 428 | self.tool.job_failed( self, message, exception ) |
|---|
| 429 | self.cleanup() |
|---|
| 430 | |
|---|
| 431 | def change_state( self, state, info = False ): |
|---|
| 432 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 433 | self.sa_session.refresh( job ) |
|---|
| 434 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
|---|
| 435 | dataset = dataset_assoc.dataset |
|---|
| 436 | self.sa_session.refresh( dataset ) |
|---|
| 437 | dataset.state = state |
|---|
| 438 | if info: |
|---|
| 439 | dataset.info = info |
|---|
| 440 | self.sa_session.add( dataset ) |
|---|
| 441 | self.sa_session.flush() |
|---|
| 442 | if info: |
|---|
| 443 | job.info = info |
|---|
| 444 | job.state = state |
|---|
| 445 | self.sa_session.add( job ) |
|---|
| 446 | self.sa_session.flush() |
|---|
| 447 | |
|---|
| 448 | def get_state( self ): |
|---|
| 449 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 450 | self.sa_session.refresh( job ) |
|---|
| 451 | return job.state |
|---|
| 452 | |
|---|
| 453 | def set_runner( self, runner_url, external_id ): |
|---|
| 454 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 455 | self.sa_session.refresh( job ) |
|---|
| 456 | job.job_runner_name = runner_url |
|---|
| 457 | job.job_runner_external_id = external_id |
|---|
| 458 | self.sa_session.add( job ) |
|---|
| 459 | self.sa_session.flush() |
|---|
| 460 | |
|---|
| 461 | def finish( self, stdout, stderr ): |
|---|
| 462 | """ |
|---|
| 463 | Called to indicate that the associated command has been run. Updates |
|---|
| 464 | the output datasets based on stderr and stdout from the command, and |
|---|
| 465 | the contents of the output files. |
|---|
| 466 | """ |
|---|
| 467 | # default post job setup |
|---|
| 468 | self.sa_session.expunge_all() |
|---|
| 469 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 470 | # if the job was deleted, don't finish it |
|---|
| 471 | if job.state == job.states.DELETED: |
|---|
| 472 | self.cleanup() |
|---|
| 473 | return |
|---|
| 474 | elif job.state == job.states.ERROR: |
|---|
| 475 | # Job was deleted by an administrator |
|---|
| 476 | self.fail( job.info ) |
|---|
| 477 | return |
|---|
| 478 | if stderr: |
|---|
| 479 | job.state = "error" |
|---|
| 480 | else: |
|---|
| 481 | job.state = 'ok' |
|---|
| 482 | if self.app.config.outputs_to_working_directory: |
|---|
| 483 | for dataset_path in self.get_output_fnames(): |
|---|
| 484 | try: |
|---|
| 485 | shutil.move( dataset_path.false_path, dataset_path.real_path ) |
|---|
| 486 | log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) |
|---|
| 487 | except ( IOError, OSError ): |
|---|
| 488 | # this can happen if Galaxy is restarted during the job's |
|---|
| 489 | # finish method - the false_path file has already moved, |
|---|
| 490 | # and when the job is recovered, it won't be found. |
|---|
| 491 | if os.path.exists( dataset_path.real_path ) and os.stat( dataset_path.real_path ).st_size > 0: |
|---|
| 492 | log.warning( "finish(): %s not found, but %s is not empty, so it will be used instead" % ( dataset_path.false_path, dataset_path.real_path ) ) |
|---|
| 493 | else: |
|---|
| 494 | self.fail( "Job %s's output dataset(s) could not be read" % job.id ) |
|---|
| 495 | return |
|---|
| 496 | job_context = ExpressionContext( dict( stdout = stdout, stderr = stderr ) ) |
|---|
| 497 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
|---|
| 498 | context = self.get_dataset_finish_context( job_context, dataset_assoc.dataset.dataset ) |
|---|
| 499 | #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur |
|---|
| 500 | for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: #need to update all associated output hdas, i.e. history was shared with job running |
|---|
| 501 | dataset.blurb = 'done' |
|---|
| 502 | dataset.peek = 'no peek' |
|---|
| 503 | dataset.info = context['stdout'] + context['stderr'] |
|---|
| 504 | dataset.set_size() |
|---|
| 505 | if context['stderr']: |
|---|
| 506 | dataset.blurb = "error" |
|---|
| 507 | elif dataset.has_data(): |
|---|
| 508 | # If the tool was expected to set the extension, attempt to retrieve it |
|---|
| 509 | if dataset.ext == 'auto': |
|---|
| 510 | dataset.extension = context.get( 'ext', 'data' ) |
|---|
| 511 | dataset.init_meta( copy_from=dataset ) |
|---|
| 512 | #if a dataset was copied, it won't appear in our dictionary: |
|---|
| 513 | #either use the metadata from originating output dataset, or call set_meta on the copies |
|---|
| 514 | #it would be quicker to just copy the metadata from the originating output dataset, |
|---|
| 515 | #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() |
|---|
| 516 | if not self.app.config.set_metadata_externally or \ |
|---|
| 517 | ( not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) \ |
|---|
| 518 | and self.app.config.retry_metadata_internally ): |
|---|
| 519 | dataset.set_meta( overwrite = False ) |
|---|
| 520 | elif not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) and not context['stderr']: |
|---|
| 521 | dataset._state = model.Dataset.states.FAILED_METADATA |
|---|
| 522 | else: |
|---|
| 523 | #load metadata from file |
|---|
| 524 | #we need to no longer allow metadata to be edited while the job is still running, |
|---|
| 525 | #since if it is edited, the metadata changed on the running output will no longer match |
|---|
| 526 | #the metadata that was stored to disk for use via the external process, |
|---|
| 527 | #and the changes made by the user will be lost, without warning or notice |
|---|
| 528 | dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset, self.sa_session ).filename_out ) |
|---|
| 529 | try: |
|---|
| 530 | assert context.get( 'line_count', None ) is not None |
|---|
| 531 | if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte: |
|---|
| 532 | dataset.set_peek( line_count=context['line_count'], is_multi_byte=True ) |
|---|
| 533 | else: |
|---|
| 534 | dataset.set_peek( line_count=context['line_count'] ) |
|---|
| 535 | except: |
|---|
| 536 | if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte: |
|---|
| 537 | dataset.set_peek( is_multi_byte=True ) |
|---|
| 538 | else: |
|---|
| 539 | dataset.set_peek() |
|---|
| 540 | try: |
|---|
| 541 | # set the name if provided by the tool |
|---|
| 542 | dataset.name = context['name'] |
|---|
| 543 | except: |
|---|
| 544 | pass |
|---|
| 545 | else: |
|---|
| 546 | dataset.blurb = "empty" |
|---|
| 547 | if dataset.ext == 'auto': |
|---|
| 548 | dataset.extension = 'txt' |
|---|
| 549 | self.sa_session.add( dataset ) |
|---|
| 550 | if context['stderr']: |
|---|
| 551 | dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR |
|---|
| 552 | else: |
|---|
| 553 | dataset_assoc.dataset.dataset.state = model.Dataset.states.OK |
|---|
| 554 | # If any of the rest of the finish method below raises an |
|---|
| 555 | # exception, the fail method will run and set the datasets to |
|---|
| 556 | # ERROR. The user will never see that the datasets are in error if |
|---|
| 557 | # they were flushed as OK here, since upon doing so, the history |
|---|
| 558 | # panel stops checking for updates. So allow the |
|---|
| 559 | # self.sa_session.flush() at the bottom of this method set |
|---|
| 560 | # the state instead. |
|---|
| 561 | |
|---|
| 562 | for pja in job.post_job_actions: |
|---|
| 563 | ActionBox.execute(self.app, self.sa_session, pja.post_job_action, job) |
|---|
| 564 | # Flush all the dataset and job changes above. Dataset state changes |
|---|
| 565 | # will now be seen by the user. |
|---|
| 566 | self.sa_session.flush() |
|---|
| 567 | # Save stdout and stderr |
|---|
| 568 | if len( stdout ) > 32768: |
|---|
| 569 | log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) |
|---|
| 570 | job.stdout = stdout[:32768] |
|---|
| 571 | if len( stderr ) > 32768: |
|---|
| 572 | log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) |
|---|
| 573 | job.stderr = stderr[:32768] |
|---|
| 574 | # custom post process setup |
|---|
| 575 | inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) |
|---|
| 576 | out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) |
|---|
| 577 | out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) |
|---|
| 578 | param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows |
|---|
| 579 | param_dict = self.tool.params_from_strings( param_dict, self.app ) |
|---|
| 580 | # Check for and move associated_files |
|---|
| 581 | self.tool.collect_associated_files(out_data, self.working_directory) |
|---|
| 582 | # Create generated output children and primary datasets and add to param_dict |
|---|
| 583 | collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)} |
|---|
| 584 | param_dict.update({'__collected_datasets__':collected_datasets}) |
|---|
| 585 | # Certain tools require tasks to be completed after job execution |
|---|
| 586 | # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). |
|---|
| 587 | self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) |
|---|
| 588 | # Call 'exec_after_process' hook |
|---|
| 589 | self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, |
|---|
| 590 | out_data=out_data, param_dict=param_dict, |
|---|
| 591 | tool=self.tool, stdout=stdout, stderr=stderr ) |
|---|
| 592 | job.command_line = self.command_line |
|---|
| 593 | |
|---|
| 594 | # fix permissions |
|---|
| 595 | for path in [ dp.real_path for dp in self.get_output_fnames() ]: |
|---|
| 596 | util.umask_fix_perms( path, self.app.config.umask, 0666, self.app.config.gid ) |
|---|
| 597 | self.sa_session.flush() |
|---|
| 598 | log.debug( 'job %d ended' % self.job_id ) |
|---|
| 599 | self.cleanup() |
|---|
| 600 | |
|---|
| 601 | def cleanup( self ): |
|---|
| 602 | # remove temporary files |
|---|
| 603 | try: |
|---|
| 604 | for fname in self.extra_filenames: |
|---|
| 605 | os.remove( fname ) |
|---|
| 606 | if self.working_directory is not None: |
|---|
| 607 | shutil.rmtree( self.working_directory ) |
|---|
| 608 | if self.app.config.set_metadata_externally: |
|---|
| 609 | self.external_output_metadata.cleanup_external_metadata( self.sa_session ) |
|---|
| 610 | galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session ) |
|---|
| 611 | except: |
|---|
| 612 | log.exception( "Unable to cleanup job %d" % self.job_id ) |
|---|
| 613 | |
|---|
| 614 | def get_command_line( self ): |
|---|
| 615 | return self.command_line |
|---|
| 616 | |
|---|
| 617 | def get_session_id( self ): |
|---|
| 618 | return self.session_id |
|---|
| 619 | |
|---|
| 620 | def get_input_fnames( self ): |
|---|
| 621 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 622 | filenames = [] |
|---|
| 623 | for da in job.input_datasets: #da is JobToInputDatasetAssociation object |
|---|
| 624 | if da.dataset: |
|---|
| 625 | filenames.append( da.dataset.file_name ) |
|---|
| 626 | #we will need to stage in metadata file names also |
|---|
| 627 | #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.) |
|---|
| 628 | for key, value in da.dataset.metadata.items(): |
|---|
| 629 | if isinstance( value, model.MetadataFile ): |
|---|
| 630 | filenames.append( value.file_name ) |
|---|
| 631 | return filenames |
|---|
| 632 | |
|---|
| 633 | def get_output_fnames( self ): |
|---|
| 634 | if self.output_paths is not None: |
|---|
| 635 | return self.output_paths |
|---|
| 636 | |
|---|
| 637 | class DatasetPath( object ): |
|---|
| 638 | def __init__( self, dataset_id, real_path, false_path = None ): |
|---|
| 639 | self.dataset_id = dataset_id |
|---|
| 640 | self.real_path = real_path |
|---|
| 641 | self.false_path = false_path |
|---|
| 642 | def __str__( self ): |
|---|
| 643 | if self.false_path is None: |
|---|
| 644 | return self.real_path |
|---|
| 645 | else: |
|---|
| 646 | return self.false_path |
|---|
| 647 | |
|---|
| 648 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 649 | # Job output datasets are combination of output datasets, library datasets, and jeha datasets. |
|---|
| 650 | jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first() |
|---|
| 651 | if self.app.config.outputs_to_working_directory: |
|---|
| 652 | self.output_paths = [] |
|---|
| 653 | for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets + job.output_library_datasets ]: |
|---|
| 654 | false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) ) |
|---|
| 655 | self.output_paths.append( DatasetPath( data.id, data.file_name, false_path ) ) |
|---|
| 656 | if jeha: |
|---|
| 657 | false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % jeha.dataset.id ) ) |
|---|
| 658 | self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name, false_path ) ) |
|---|
| 659 | else: |
|---|
| 660 | self.output_paths = [ DatasetPath( da.dataset.dataset.id, da.dataset.file_name ) for da in job.output_datasets + job.output_library_datasets ] |
|---|
| 661 | if jeha: |
|---|
| 662 | self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name ) ) |
|---|
| 663 | |
|---|
| 664 | return self.output_paths |
|---|
| 665 | |
|---|
| 666 | def get_output_file_id( self, file ): |
|---|
| 667 | if self.output_paths is None: |
|---|
| 668 | self.get_output_fnames() |
|---|
| 669 | for dp in self.output_paths: |
|---|
| 670 | if self.app.config.outputs_to_working_directory and os.path.basename( dp.false_path ) == file: |
|---|
| 671 | return dp.dataset_id |
|---|
| 672 | elif os.path.basename( dp.real_path ) == file: |
|---|
| 673 | return dp.dataset_id |
|---|
| 674 | return None |
|---|
| 675 | |
|---|
| 676 | def get_tool_provided_job_metadata( self ): |
|---|
| 677 | if self.tool_provided_job_metadata is not None: |
|---|
| 678 | return self.tool_provided_job_metadata |
|---|
| 679 | |
|---|
| 680 | # Look for JSONified job metadata |
|---|
| 681 | self.tool_provided_job_metadata = [] |
|---|
| 682 | meta_file = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ) |
|---|
| 683 | if os.path.exists( meta_file ): |
|---|
| 684 | for line in open( meta_file, 'r' ): |
|---|
| 685 | try: |
|---|
| 686 | line = from_json_string( line ) |
|---|
| 687 | assert 'type' in line |
|---|
| 688 | except: |
|---|
| 689 | log.exception( '(%s) Got JSON data from tool, but data is improperly formatted or no "type" key in data' % self.job_id ) |
|---|
| 690 | log.debug( 'Offending data was: %s' % line ) |
|---|
| 691 | continue |
|---|
| 692 | # Set the dataset id if it's a dataset entry and isn't set. |
|---|
| 693 | # This isn't insecure. We loop the job's output datasets in |
|---|
| 694 | # the finish method, so if a tool writes out metadata for a |
|---|
| 695 | # dataset id that it doesn't own, it'll just be ignored. |
|---|
| 696 | if line['type'] == 'dataset' and 'dataset_id' not in line: |
|---|
| 697 | try: |
|---|
| 698 | line['dataset_id'] = self.get_output_file_id( line['dataset'] ) |
|---|
| 699 | except KeyError: |
|---|
| 700 | log.warning( '(%s) Tool provided job dataset-specific metadata without specifying a dataset' % self.job_id ) |
|---|
| 701 | continue |
|---|
| 702 | self.tool_provided_job_metadata.append( line ) |
|---|
| 703 | return self.tool_provided_job_metadata |
|---|
| 704 | |
|---|
| 705 | def get_dataset_finish_context( self, job_context, dataset ): |
|---|
| 706 | for meta in self.get_tool_provided_job_metadata(): |
|---|
| 707 | if meta['type'] == 'dataset' and meta['dataset_id'] == dataset.id: |
|---|
| 708 | return ExpressionContext( meta, job_context ) |
|---|
| 709 | return job_context |
|---|
| 710 | |
|---|
| 711 | def check_output_sizes( self ): |
|---|
| 712 | sizes = [] |
|---|
| 713 | output_paths = self.get_output_fnames() |
|---|
| 714 | for outfile in [ str( o ) for o in output_paths ]: |
|---|
| 715 | sizes.append( ( outfile, os.stat( outfile ).st_size ) ) |
|---|
| 716 | return sizes |
|---|
| 717 | |
|---|
| 718 | def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ): |
|---|
| 719 | # extension could still be 'auto' if this is the upload tool. |
|---|
| 720 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 721 | if set_extension: |
|---|
| 722 | for output_dataset_assoc in job.output_datasets: |
|---|
| 723 | if output_dataset_assoc.dataset.ext == 'auto': |
|---|
| 724 | context = self.get_dataset_finish_context( dict(), output_dataset_assoc.dataset.dataset ) |
|---|
| 725 | output_dataset_assoc.dataset.extension = context.get( 'ext', 'data' ) |
|---|
| 726 | self.sa_session.flush() |
|---|
| 727 | if tmp_dir is None: |
|---|
| 728 | #this dir should should relative to the exec_dir |
|---|
| 729 | tmp_dir = self.app.config.new_file_path |
|---|
| 730 | if dataset_files_path is None: |
|---|
| 731 | dataset_files_path = self.app.model.Dataset.file_path |
|---|
| 732 | if config_root is None: |
|---|
| 733 | config_root = self.app.config.root |
|---|
| 734 | if datatypes_config is None: |
|---|
| 735 | datatypes_config = self.app.config.datatypes_config |
|---|
| 736 | return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], |
|---|
| 737 | self.sa_session, |
|---|
| 738 | exec_dir = exec_dir, |
|---|
| 739 | tmp_dir = tmp_dir, |
|---|
| 740 | dataset_files_path = dataset_files_path, |
|---|
| 741 | config_root = config_root, |
|---|
| 742 | datatypes_config = datatypes_config, |
|---|
| 743 | job_metadata = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ), |
|---|
| 744 | **kwds ) |
|---|
| 745 | |
|---|
| 746 | @property |
|---|
| 747 | def user( self ): |
|---|
| 748 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
|---|
| 749 | if job.user is not None: |
|---|
| 750 | return job.user.email |
|---|
| 751 | elif job.galaxy_session is not None and job.galaxy_session.user is not None: |
|---|
| 752 | return job.galaxy_session.user.email |
|---|
| 753 | elif job.history is not None and job.history.user is not None: |
|---|
| 754 | return job.history.user.email |
|---|
| 755 | elif job.galaxy_session is not None: |
|---|
| 756 | return 'anonymous@' + job.galaxy_session.remote_addr.split()[-1] |
|---|
| 757 | else: |
|---|
| 758 | return 'anonymous@unknown' |
|---|
| 759 | |
|---|
| 760 | class DefaultJobDispatcher( object ): |
|---|
| 761 | def __init__( self, app ): |
|---|
| 762 | self.app = app |
|---|
| 763 | self.job_runners = {} |
|---|
| 764 | start_job_runners = ["local"] |
|---|
| 765 | if app.config.start_job_runners is not None: |
|---|
| 766 | start_job_runners.extend( app.config.start_job_runners.split(",") ) |
|---|
| 767 | for runner_name in start_job_runners: |
|---|
| 768 | if runner_name == "local": |
|---|
| 769 | import runners.local |
|---|
| 770 | self.job_runners[runner_name] = runners.local.LocalJobRunner( app ) |
|---|
| 771 | elif runner_name == "pbs": |
|---|
| 772 | import runners.pbs |
|---|
| 773 | self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app ) |
|---|
| 774 | elif runner_name == "sge": |
|---|
| 775 | import runners.sge |
|---|
| 776 | self.job_runners[runner_name] = runners.sge.SGEJobRunner( app ) |
|---|
| 777 | elif runner_name == "drmaa": |
|---|
| 778 | import runners.drmaa |
|---|
| 779 | self.job_runners[runner_name] = runners.drmaa.DRMAAJobRunner( app ) |
|---|
| 780 | else: |
|---|
| 781 | log.error( "Unable to start unknown job runner: %s" %runner_name ) |
|---|
| 782 | |
|---|
| 783 | def put( self, job_wrapper ): |
|---|
| 784 | runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] |
|---|
| 785 | log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) |
|---|
| 786 | self.job_runners[runner_name].put( job_wrapper ) |
|---|
| 787 | |
|---|
| 788 | def stop( self, job ): |
|---|
| 789 | runner_name = ( job.job_runner_name.split(":", 1) )[0] |
|---|
| 790 | log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) |
|---|
| 791 | self.job_runners[runner_name].stop_job( job ) |
|---|
| 792 | |
|---|
| 793 | def recover( self, job, job_wrapper ): |
|---|
| 794 | runner_name = ( job.job_runner_name.split(":", 1) )[0] |
|---|
| 795 | log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) |
|---|
| 796 | self.job_runners[runner_name].recover( job, job_wrapper ) |
|---|
| 797 | |
|---|
| 798 | def shutdown( self ): |
|---|
| 799 | for runner in self.job_runners.itervalues(): |
|---|
| 800 | runner.shutdown() |
|---|
| 801 | |
|---|
| 802 | class JobStopQueue( object ): |
|---|
| 803 | """ |
|---|
| 804 | A queue for jobs which need to be terminated prematurely. |
|---|
| 805 | """ |
|---|
| 806 | STOP_SIGNAL = object() |
|---|
| 807 | def __init__( self, app, dispatcher ): |
|---|
| 808 | self.app = app |
|---|
| 809 | self.sa_session = app.model.context |
|---|
| 810 | self.dispatcher = dispatcher |
|---|
| 811 | |
|---|
| 812 | # Keep track of the pid that started the job manager, only it |
|---|
| 813 | # has valid threads |
|---|
| 814 | self.parent_pid = os.getpid() |
|---|
| 815 | # Contains new jobs. Note this is not used if track_jobs_in_database is True |
|---|
| 816 | self.queue = Queue() |
|---|
| 817 | |
|---|
| 818 | # Contains jobs that are waiting (only use from monitor thread) |
|---|
| 819 | self.waiting = [] |
|---|
| 820 | |
|---|
| 821 | # Helper for interruptable sleep |
|---|
| 822 | self.sleeper = Sleeper() |
|---|
| 823 | self.running = True |
|---|
| 824 | self.monitor_thread = threading.Thread( target=self.monitor ) |
|---|
| 825 | self.monitor_thread.start() |
|---|
| 826 | log.info( "job stopper started" ) |
|---|
| 827 | |
|---|
| 828 | def monitor( self ): |
|---|
| 829 | """ |
|---|
| 830 | Continually iterate the waiting jobs, stop any that are found. |
|---|
| 831 | """ |
|---|
| 832 | # HACK: Delay until after forking, we need a way to do post fork notification!!! |
|---|
| 833 | time.sleep( 10 ) |
|---|
| 834 | while self.running: |
|---|
| 835 | try: |
|---|
| 836 | self.monitor_step() |
|---|
| 837 | except: |
|---|
| 838 | log.exception( "Exception in monitor_step" ) |
|---|
| 839 | # Sleep |
|---|
| 840 | self.sleeper.sleep( 1 ) |
|---|
| 841 | |
|---|
| 842 | def monitor_step( self ): |
|---|
| 843 | """ |
|---|
| 844 | Called repeatedly by `monitor` to stop jobs. |
|---|
| 845 | """ |
|---|
| 846 | # Pull all new jobs from the queue at once |
|---|
| 847 | jobs = [] |
|---|
| 848 | try: |
|---|
| 849 | while 1: |
|---|
| 850 | ( job_id, error_msg ) = self.queue.get_nowait() |
|---|
| 851 | if job_id is self.STOP_SIGNAL: |
|---|
| 852 | return |
|---|
| 853 | # Append to watch queue |
|---|
| 854 | jobs.append( ( job_id, error_msg ) ) |
|---|
| 855 | except Empty: |
|---|
| 856 | pass |
|---|
| 857 | |
|---|
| 858 | for job_id, error_msg in jobs: |
|---|
| 859 | job = self.sa_session.query( model.Job ).get( job_id ) |
|---|
| 860 | self.sa_session.refresh( job ) |
|---|
| 861 | # if desired, error the job so we can inform the user. |
|---|
| 862 | if error_msg is not None: |
|---|
| 863 | job.state = job.states.ERROR |
|---|
| 864 | job.info = error_msg |
|---|
| 865 | else: |
|---|
| 866 | job.state = job.states.DELETED |
|---|
| 867 | self.sa_session.add( job ) |
|---|
| 868 | self.sa_session.flush() |
|---|
| 869 | # if job is in JobQueue or FooJobRunner's put method, |
|---|
| 870 | # job_runner_name will be unset and the job will be dequeued due to |
|---|
| 871 | # state change above |
|---|
| 872 | if job.job_runner_name is not None: |
|---|
| 873 | # tell the dispatcher to stop the job |
|---|
| 874 | self.dispatcher.stop( job ) |
|---|
| 875 | |
|---|
| 876 | def put( self, job_id, error_msg=None ): |
|---|
| 877 | self.queue.put( ( job_id, error_msg ) ) |
|---|
| 878 | |
|---|
| 879 | def shutdown( self ): |
|---|
| 880 | """Attempts to gracefully shut down the worker thread""" |
|---|
| 881 | if self.parent_pid != os.getpid(): |
|---|
| 882 | # We're not the real job queue, do nothing |
|---|
| 883 | return |
|---|
| 884 | else: |
|---|
| 885 | log.info( "sending stop signal to worker thread" ) |
|---|
| 886 | self.running = False |
|---|
| 887 | self.queue.put( ( self.STOP_SIGNAL, None ) ) |
|---|
| 888 | self.sleeper.wake() |
|---|
| 889 | log.info( "job stopper stopped" ) |
|---|
| 890 | |
|---|
| 891 | class NoopQueue( object ): |
|---|
| 892 | """ |
|---|
| 893 | Implements the JobQueue / JobStopQueue interface but does nothing |
|---|
| 894 | """ |
|---|
| 895 | def put( self, *args ): |
|---|
| 896 | return |
|---|
| 897 | def shutdown( self ): |
|---|
| 898 | return |
|---|
| 899 | |
|---|