[2] | 1 | import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil |
---|
| 2 | |
---|
| 3 | import galaxy |
---|
| 4 | from galaxy import util, model |
---|
| 5 | from galaxy.model.orm import lazyload |
---|
| 6 | from galaxy.datatypes.tabular import * |
---|
| 7 | from galaxy.datatypes.interval import * |
---|
| 8 | from galaxy.datatypes import metadata |
---|
| 9 | from galaxy.util.json import from_json_string |
---|
| 10 | from galaxy.util.expressions import ExpressionContext |
---|
| 11 | from galaxy.jobs.actions.post import ActionBox |
---|
| 12 | |
---|
| 13 | import pkg_resources |
---|
| 14 | pkg_resources.require( "PasteDeploy" ) |
---|
| 15 | |
---|
| 16 | from paste.deploy.converters import asbool |
---|
| 17 | |
---|
| 18 | from Queue import Queue, Empty |
---|
| 19 | |
---|
| 20 | log = logging.getLogger( __name__ ) |
---|
| 21 | |
---|
| 22 | # States for running a job. These are NOT the same as data states |
---|
| 23 | JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted' |
---|
| 24 | |
---|
| 25 | # This file, if created in the job's working directory, will be used for |
---|
| 26 | # setting advanced metadata properties on the job and its associated outputs. |
---|
| 27 | # This interface is currently experimental, is only used by the upload tool, |
---|
| 28 | # and should eventually become API'd |
---|
| 29 | TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json' |
---|
| 30 | |
---|
| 31 | class JobManager( object ): |
---|
| 32 | """ |
---|
| 33 | Highest level interface to job management. |
---|
| 34 | |
---|
| 35 | TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly. |
---|
| 36 | This should be decoupled. |
---|
| 37 | """ |
---|
| 38 | def __init__( self, app ): |
---|
| 39 | self.app = app |
---|
| 40 | if self.app.config.get_bool( "enable_job_running", True ): |
---|
| 41 | # The dispatcher launches the underlying job runners |
---|
| 42 | self.dispatcher = DefaultJobDispatcher( app ) |
---|
| 43 | # Queues for starting and stopping jobs |
---|
| 44 | self.job_queue = JobQueue( app, self.dispatcher ) |
---|
| 45 | self.job_stop_queue = JobStopQueue( app, self.dispatcher ) |
---|
| 46 | else: |
---|
| 47 | self.job_queue = self.job_stop_queue = NoopQueue() |
---|
| 48 | def shutdown( self ): |
---|
| 49 | self.job_queue.shutdown() |
---|
| 50 | self.job_stop_queue.shutdown() |
---|
| 51 | |
---|
| 52 | class Sleeper( object ): |
---|
| 53 | """ |
---|
| 54 | Provides a 'sleep' method that sleeps for a number of seconds *unless* |
---|
| 55 | the notify method is called (from a different thread). |
---|
| 56 | """ |
---|
| 57 | def __init__( self ): |
---|
| 58 | self.condition = threading.Condition() |
---|
| 59 | def sleep( self, seconds ): |
---|
| 60 | self.condition.acquire() |
---|
| 61 | self.condition.wait( seconds ) |
---|
| 62 | self.condition.release() |
---|
| 63 | def wake( self ): |
---|
| 64 | self.condition.acquire() |
---|
| 65 | self.condition.notify() |
---|
| 66 | self.condition.release() |
---|
| 67 | |
---|
| 68 | class JobQueue( object ): |
---|
| 69 | """ |
---|
| 70 | Job manager, waits for jobs to be runnable and then dispatches to |
---|
| 71 | a JobRunner. |
---|
| 72 | """ |
---|
| 73 | STOP_SIGNAL = object() |
---|
| 74 | def __init__( self, app, dispatcher ): |
---|
| 75 | """Start the job manager""" |
---|
| 76 | self.app = app |
---|
| 77 | self.sa_session = app.model.context |
---|
| 78 | self.job_lock = False |
---|
| 79 | # Should we read jobs form the database, or use an in memory queue |
---|
| 80 | self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False ) |
---|
| 81 | # Keep track of the pid that started the job manager, only it |
---|
| 82 | # has valid threads |
---|
| 83 | self.parent_pid = os.getpid() |
---|
| 84 | # Contains new jobs. Note this is not used if track_jobs_in_database is True |
---|
| 85 | self.queue = Queue() |
---|
| 86 | |
---|
| 87 | # Contains jobs that are waiting (only use from monitor thread) |
---|
| 88 | ## This and jobs_to_check[] are closest to a "Job Queue" |
---|
| 89 | self.waiting_jobs = [] |
---|
| 90 | |
---|
| 91 | # Helper for interruptable sleep |
---|
| 92 | self.sleeper = Sleeper() |
---|
| 93 | self.running = True |
---|
| 94 | self.dispatcher = dispatcher |
---|
| 95 | self.monitor_thread = threading.Thread( target=self.__monitor ) |
---|
| 96 | self.monitor_thread.start() |
---|
| 97 | log.info( "job manager started" ) |
---|
| 98 | if app.config.get_bool( 'enable_job_recovery', True ): |
---|
| 99 | self.__check_jobs_at_startup() |
---|
| 100 | |
---|
| 101 | def __check_jobs_at_startup( self ): |
---|
| 102 | """ |
---|
| 103 | Checks all jobs that are in the 'new', 'queued' or 'running' state in |
---|
| 104 | the database and requeues or cleans up as necessary. Only run as the |
---|
| 105 | job manager starts. |
---|
| 106 | """ |
---|
| 107 | model = self.app.model |
---|
| 108 | for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ): |
---|
| 109 | if job.tool_id not in self.app.toolbox.tools_by_id: |
---|
| 110 | log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) |
---|
| 111 | JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) |
---|
| 112 | else: |
---|
| 113 | log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id ) |
---|
| 114 | self.queue.put( ( job.id, job.tool_id ) ) |
---|
| 115 | for job in self.sa_session.query( model.Job ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ): |
---|
| 116 | if job.tool_id not in self.app.toolbox.tools_by_id: |
---|
| 117 | log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) |
---|
| 118 | JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) |
---|
| 119 | elif job.job_runner_name is None: |
---|
| 120 | log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id ) |
---|
| 121 | if self.track_jobs_in_database: |
---|
| 122 | job.state = model.Job.states.NEW |
---|
| 123 | else: |
---|
| 124 | self.queue.put( ( job.id, job.tool_id ) ) |
---|
| 125 | else: |
---|
| 126 | job_wrapper = JobWrapper( job, self ) |
---|
| 127 | self.dispatcher.recover( job, job_wrapper ) |
---|
| 128 | if self.sa_session.dirty: |
---|
| 129 | self.sa_session.flush() |
---|
| 130 | |
---|
| 131 | def __monitor( self ): |
---|
| 132 | """ |
---|
| 133 | Continually iterate the waiting jobs, checking is each is ready to |
---|
| 134 | run and dispatching if so. |
---|
| 135 | """ |
---|
| 136 | # HACK: Delay until after forking, we need a way to do post fork notification!!! |
---|
| 137 | time.sleep( 10 ) |
---|
| 138 | while self.running: |
---|
| 139 | try: |
---|
| 140 | self.__monitor_step() |
---|
| 141 | except: |
---|
| 142 | log.exception( "Exception in monitor_step" ) |
---|
| 143 | # Sleep |
---|
| 144 | self.sleeper.sleep( 1 ) |
---|
| 145 | |
---|
| 146 | def __monitor_step( self ): |
---|
| 147 | """ |
---|
| 148 | Called repeatedly by `monitor` to process waiting jobs. Gets any new |
---|
| 149 | jobs (either from the database or from its own queue), then iterates |
---|
| 150 | over all new and waiting jobs to check the state of the jobs each |
---|
| 151 | depends on. If the job has dependencies that have not finished, it |
---|
| 152 | it goes to the waiting queue. If the job has dependencies with errors, |
---|
| 153 | it is marked as having errors and removed from the queue. Otherwise, |
---|
| 154 | the job is dispatched. |
---|
| 155 | """ |
---|
| 156 | # Pull all new jobs from the queue at once |
---|
| 157 | jobs_to_check = [] |
---|
| 158 | if self.track_jobs_in_database: |
---|
| 159 | # Clear the session so we get fresh states for job and all datasets |
---|
| 160 | self.sa_session.expunge_all() |
---|
| 161 | # Fetch all new jobs |
---|
| 162 | jobs_to_check = self.sa_session.query( model.Job ) \ |
---|
| 163 | .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ |
---|
| 164 | .filter( model.Job.state == model.Job.states.NEW ).all() |
---|
| 165 | else: |
---|
| 166 | try: |
---|
| 167 | while 1: |
---|
| 168 | message = self.queue.get_nowait() |
---|
| 169 | if message is self.STOP_SIGNAL: |
---|
| 170 | return |
---|
| 171 | # Unpack the message |
---|
| 172 | job_id, tool_id = message |
---|
| 173 | # Get the job object and append to watch queue |
---|
| 174 | jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) |
---|
| 175 | except Empty: |
---|
| 176 | pass |
---|
| 177 | # Get job objects and append to watch queue for any which were |
---|
| 178 | # previously waiting |
---|
| 179 | for job_id in self.waiting_jobs: |
---|
| 180 | jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) |
---|
| 181 | # Iterate over new and waiting jobs and look for any that are |
---|
| 182 | # ready to run |
---|
| 183 | new_waiting_jobs = [] |
---|
| 184 | for job in jobs_to_check: |
---|
| 185 | try: |
---|
| 186 | # Check the job's dependencies, requeue if they're not done |
---|
| 187 | job_state = self.__check_if_ready_to_run( job ) |
---|
| 188 | if job_state == JOB_WAIT: |
---|
| 189 | if not self.track_jobs_in_database: |
---|
| 190 | new_waiting_jobs.append( job.id ) |
---|
| 191 | elif job_state == JOB_INPUT_ERROR: |
---|
| 192 | log.info( "job %d unable to run: one or more inputs in error state" % job.id ) |
---|
| 193 | elif job_state == JOB_INPUT_DELETED: |
---|
| 194 | log.info( "job %d unable to run: one or more inputs deleted" % job.id ) |
---|
| 195 | elif job_state == JOB_READY: |
---|
| 196 | if self.job_lock: |
---|
| 197 | log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id ) |
---|
| 198 | if not self.track_jobs_in_database: |
---|
| 199 | new_waiting_jobs.append( job.id ) |
---|
| 200 | else: |
---|
| 201 | self.dispatcher.put( JobWrapper( job, self ) ) |
---|
| 202 | log.info( "job %d dispatched" % job.id ) |
---|
| 203 | elif job_state == JOB_DELETED: |
---|
| 204 | log.info( "job %d deleted by user while still queued" % job.id ) |
---|
| 205 | elif job_state == JOB_ADMIN_DELETED: |
---|
| 206 | job.info( "job %d deleted by admin while still queued" % job.id ) |
---|
| 207 | else: |
---|
| 208 | log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) ) |
---|
| 209 | if not self.track_jobs_in_database: |
---|
| 210 | new_waiting_jobs.append( job.id ) |
---|
| 211 | except Exception, e: |
---|
| 212 | log.exception( "failure running job %d" % job.id ) |
---|
| 213 | # Update the waiting list |
---|
| 214 | self.waiting_jobs = new_waiting_jobs |
---|
| 215 | # Done with the session |
---|
| 216 | self.sa_session.remove() |
---|
| 217 | |
---|
| 218 | def __check_if_ready_to_run( self, job ): |
---|
| 219 | """ |
---|
| 220 | Check if a job is ready to run by verifying that each of its input |
---|
| 221 | datasets is ready (specifically in the OK state). If any input dataset |
---|
| 222 | has an error, fail the job and return JOB_INPUT_ERROR. If any input |
---|
| 223 | dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all |
---|
| 224 | input datasets are in OK state, return JOB_READY indicating that the |
---|
| 225 | job can be dispatched. Otherwise, return JOB_WAIT indicating that input |
---|
| 226 | datasets are still being prepared. |
---|
| 227 | """ |
---|
| 228 | if job.state == model.Job.states.DELETED: |
---|
| 229 | return JOB_DELETED |
---|
| 230 | elif job.state == model.Job.states.ERROR: |
---|
| 231 | return JOB_ADMIN_DELETED |
---|
| 232 | for dataset_assoc in job.input_datasets: |
---|
| 233 | idata = dataset_assoc.dataset |
---|
| 234 | if not idata: |
---|
| 235 | continue |
---|
| 236 | # don't run jobs for which the input dataset was deleted |
---|
| 237 | if idata.deleted: |
---|
| 238 | JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) |
---|
| 239 | return JOB_INPUT_DELETED |
---|
| 240 | # an error in the input data causes us to bail immediately |
---|
| 241 | elif idata.state == idata.states.ERROR: |
---|
| 242 | JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) ) |
---|
| 243 | return JOB_INPUT_ERROR |
---|
| 244 | elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): |
---|
| 245 | # need to requeue |
---|
| 246 | return JOB_WAIT |
---|
| 247 | return JOB_READY |
---|
| 248 | |
---|
| 249 | def put( self, job_id, tool ): |
---|
| 250 | """Add a job to the queue (by job identifier)""" |
---|
| 251 | if not self.track_jobs_in_database: |
---|
| 252 | self.queue.put( ( job_id, tool.id ) ) |
---|
| 253 | self.sleeper.wake() |
---|
| 254 | |
---|
| 255 | def shutdown( self ): |
---|
| 256 | """Attempts to gracefully shut down the worker thread""" |
---|
| 257 | if self.parent_pid != os.getpid(): |
---|
| 258 | # We're not the real job queue, do nothing |
---|
| 259 | return |
---|
| 260 | else: |
---|
| 261 | log.info( "sending stop signal to worker thread" ) |
---|
| 262 | self.running = False |
---|
| 263 | if not self.track_jobs_in_database: |
---|
| 264 | self.queue.put( self.STOP_SIGNAL ) |
---|
| 265 | self.sleeper.wake() |
---|
| 266 | log.info( "job queue stopped" ) |
---|
| 267 | self.dispatcher.shutdown() |
---|
| 268 | |
---|
| 269 | class JobWrapper( object ): |
---|
| 270 | """ |
---|
| 271 | Wraps a 'model.Job' with convience methods for running processes and |
---|
| 272 | state management. |
---|
| 273 | """ |
---|
| 274 | def __init__( self, job, queue ): |
---|
| 275 | self.job_id = job.id |
---|
| 276 | self.session_id = job.session_id |
---|
| 277 | self.user_id = job.user_id |
---|
| 278 | self.tool = queue.app.toolbox.tools_by_id.get( job.tool_id, None ) |
---|
| 279 | self.queue = queue |
---|
| 280 | self.app = queue.app |
---|
| 281 | self.sa_session = self.app.model.context |
---|
| 282 | self.extra_filenames = [] |
---|
| 283 | self.command_line = None |
---|
| 284 | self.galaxy_lib_dir = None |
---|
| 285 | # With job outputs in the working directory, we need the working |
---|
| 286 | # directory to be set before prepare is run, or else premature deletion |
---|
| 287 | # and job recovery fail. |
---|
| 288 | self.working_directory = \ |
---|
| 289 | os.path.join( self.app.config.job_working_directory, str( self.job_id ) ) |
---|
| 290 | self.output_paths = None |
---|
| 291 | self.tool_provided_job_metadata = None |
---|
| 292 | # Wrapper holding the info required to restore and clean up from files used for setting metadata externally |
---|
| 293 | self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) |
---|
| 294 | |
---|
| 295 | def get_param_dict( self ): |
---|
| 296 | """ |
---|
| 297 | Restore the dictionary of parameters from the database. |
---|
| 298 | """ |
---|
| 299 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 300 | param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) |
---|
| 301 | param_dict = self.tool.params_from_strings( param_dict, self.app ) |
---|
| 302 | return param_dict |
---|
| 303 | |
---|
| 304 | def prepare( self ): |
---|
| 305 | """ |
---|
| 306 | Prepare the job to run by creating the working directory and the |
---|
| 307 | config files. |
---|
| 308 | """ |
---|
| 309 | self.sa_session.expunge_all() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner |
---|
| 310 | if not os.path.exists( self.working_directory ): |
---|
| 311 | os.mkdir( self.working_directory ) |
---|
| 312 | # Restore parameters from the database |
---|
| 313 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 314 | if job.user is None and job.galaxy_session is None: |
---|
| 315 | raise Exception( 'Job %s has no user and no session.' % job.id ) |
---|
| 316 | incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] ) |
---|
| 317 | incoming = self.tool.params_from_strings( incoming, self.app ) |
---|
| 318 | # Do any validation that could not be done at job creation |
---|
| 319 | self.tool.handle_unvalidated_param_values( incoming, self.app ) |
---|
| 320 | # Restore input / output data lists |
---|
| 321 | inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) |
---|
| 322 | out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) |
---|
| 323 | out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) |
---|
| 324 | |
---|
| 325 | # Set up output dataset association for export history jobs. Because job |
---|
| 326 | # uses a Dataset rather than an HDA or LDA, it's necessary to set up a |
---|
| 327 | # fake dataset association that provides the needed attributes for |
---|
| 328 | # preparing a job. |
---|
| 329 | class FakeDatasetAssociation ( object ): |
---|
| 330 | def __init__( self, dataset=None ): |
---|
| 331 | self.dataset = dataset |
---|
| 332 | self.file_name = dataset.file_name |
---|
| 333 | self.metadata = dict() |
---|
| 334 | self.children = [] |
---|
| 335 | jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first() |
---|
| 336 | if jeha: |
---|
| 337 | out_data[ "output_file" ] = FakeDatasetAssociation( dataset=jeha.dataset ) |
---|
| 338 | # These can be passed on the command line if wanted as $userId $userEmail |
---|
| 339 | if job.history and job.history.user: # check for anonymous user! |
---|
| 340 | userId = '%d' % job.history.user.id |
---|
| 341 | userEmail = str(job.history.user.email) |
---|
| 342 | else: |
---|
| 343 | userId = 'Anonymous' |
---|
| 344 | userEmail = 'Anonymous' |
---|
| 345 | incoming['userId'] = userId |
---|
| 346 | incoming['userEmail'] = userEmail |
---|
| 347 | # Build params, done before hook so hook can use |
---|
| 348 | param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) |
---|
| 349 | # Certain tools require tasks to be completed prior to job execution |
---|
| 350 | # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). |
---|
| 351 | self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) |
---|
| 352 | # Run the before queue ("exec_before_job") hook |
---|
| 353 | self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, |
---|
| 354 | out_data=out_data, tool=self.tool, param_dict=incoming) |
---|
| 355 | self.sa_session.flush() |
---|
| 356 | # Build any required config files |
---|
| 357 | config_filenames = self.tool.build_config_files( param_dict, self.working_directory ) |
---|
| 358 | # FIXME: Build the param file (might return None, DEPRECATED) |
---|
| 359 | param_filename = self.tool.build_param_file( param_dict, self.working_directory ) |
---|
| 360 | # Build the job's command line |
---|
| 361 | self.command_line = self.tool.build_command_line( param_dict ) |
---|
| 362 | # FIXME: for now, tools get Galaxy's lib dir in their path |
---|
| 363 | if self.command_line and self.command_line.startswith( 'python' ): |
---|
| 364 | self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root |
---|
| 365 | # Shell fragment to inject dependencies |
---|
| 366 | if self.app.config.use_tool_dependencies: |
---|
| 367 | self.dependency_shell_commands = self.tool.build_dependency_shell_commands() |
---|
| 368 | else: |
---|
| 369 | self.dependency_shell_commands = None |
---|
| 370 | # We need command_line persisted to the db in order for Galaxy to re-queue the job |
---|
| 371 | # if the server was stopped and restarted before the job finished |
---|
| 372 | job.command_line = self.command_line |
---|
| 373 | self.sa_session.add( job ) |
---|
| 374 | self.sa_session.flush() |
---|
| 375 | # Return list of all extra files |
---|
| 376 | extra_filenames = config_filenames |
---|
| 377 | if param_filename is not None: |
---|
| 378 | extra_filenames.append( param_filename ) |
---|
| 379 | self.param_dict = param_dict |
---|
| 380 | self.extra_filenames = extra_filenames |
---|
| 381 | return extra_filenames |
---|
| 382 | |
---|
| 383 | def fail( self, message, exception=False ): |
---|
| 384 | """ |
---|
| 385 | Indicate job failure by setting state and message on all output |
---|
| 386 | datasets. |
---|
| 387 | """ |
---|
| 388 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 389 | self.sa_session.refresh( job ) |
---|
| 390 | # if the job was deleted, don't fail it |
---|
| 391 | if not job.state == model.Job.states.DELETED: |
---|
| 392 | # Check if the failure is due to an exception |
---|
| 393 | if exception: |
---|
| 394 | # Save the traceback immediately in case we generate another |
---|
| 395 | # below |
---|
| 396 | job.traceback = traceback.format_exc() |
---|
| 397 | # Get the exception and let the tool attempt to generate |
---|
| 398 | # a better message |
---|
| 399 | etype, evalue, tb = sys.exc_info() |
---|
| 400 | m = self.tool.handle_job_failure_exception( evalue ) |
---|
| 401 | if m: |
---|
| 402 | message = m |
---|
| 403 | if self.app.config.outputs_to_working_directory: |
---|
| 404 | for dataset_path in self.get_output_fnames(): |
---|
| 405 | try: |
---|
| 406 | shutil.move( dataset_path.false_path, dataset_path.real_path ) |
---|
| 407 | log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) |
---|
| 408 | except ( IOError, OSError ), e: |
---|
| 409 | log.error( "fail(): Missing output file in working directory: %s" % e ) |
---|
| 410 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
---|
| 411 | dataset = dataset_assoc.dataset |
---|
| 412 | self.sa_session.refresh( dataset ) |
---|
| 413 | dataset.state = dataset.states.ERROR |
---|
| 414 | dataset.blurb = 'tool error' |
---|
| 415 | dataset.info = message |
---|
| 416 | dataset.set_size() |
---|
| 417 | if dataset.ext == 'auto': |
---|
| 418 | dataset.extension = 'data' |
---|
| 419 | self.sa_session.add( dataset ) |
---|
| 420 | self.sa_session.flush() |
---|
| 421 | job.state = model.Job.states.ERROR |
---|
| 422 | job.command_line = self.command_line |
---|
| 423 | job.info = message |
---|
| 424 | self.sa_session.add( job ) |
---|
| 425 | self.sa_session.flush() |
---|
| 426 | # If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up |
---|
| 427 | if self.tool: |
---|
| 428 | self.tool.job_failed( self, message, exception ) |
---|
| 429 | self.cleanup() |
---|
| 430 | |
---|
| 431 | def change_state( self, state, info = False ): |
---|
| 432 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 433 | self.sa_session.refresh( job ) |
---|
| 434 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
---|
| 435 | dataset = dataset_assoc.dataset |
---|
| 436 | self.sa_session.refresh( dataset ) |
---|
| 437 | dataset.state = state |
---|
| 438 | if info: |
---|
| 439 | dataset.info = info |
---|
| 440 | self.sa_session.add( dataset ) |
---|
| 441 | self.sa_session.flush() |
---|
| 442 | if info: |
---|
| 443 | job.info = info |
---|
| 444 | job.state = state |
---|
| 445 | self.sa_session.add( job ) |
---|
| 446 | self.sa_session.flush() |
---|
| 447 | |
---|
| 448 | def get_state( self ): |
---|
| 449 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 450 | self.sa_session.refresh( job ) |
---|
| 451 | return job.state |
---|
| 452 | |
---|
| 453 | def set_runner( self, runner_url, external_id ): |
---|
| 454 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 455 | self.sa_session.refresh( job ) |
---|
| 456 | job.job_runner_name = runner_url |
---|
| 457 | job.job_runner_external_id = external_id |
---|
| 458 | self.sa_session.add( job ) |
---|
| 459 | self.sa_session.flush() |
---|
| 460 | |
---|
| 461 | def finish( self, stdout, stderr ): |
---|
| 462 | """ |
---|
| 463 | Called to indicate that the associated command has been run. Updates |
---|
| 464 | the output datasets based on stderr and stdout from the command, and |
---|
| 465 | the contents of the output files. |
---|
| 466 | """ |
---|
| 467 | # default post job setup |
---|
| 468 | self.sa_session.expunge_all() |
---|
| 469 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 470 | # if the job was deleted, don't finish it |
---|
| 471 | if job.state == job.states.DELETED: |
---|
| 472 | self.cleanup() |
---|
| 473 | return |
---|
| 474 | elif job.state == job.states.ERROR: |
---|
| 475 | # Job was deleted by an administrator |
---|
| 476 | self.fail( job.info ) |
---|
| 477 | return |
---|
| 478 | if stderr: |
---|
| 479 | job.state = "error" |
---|
| 480 | else: |
---|
| 481 | job.state = 'ok' |
---|
| 482 | if self.app.config.outputs_to_working_directory: |
---|
| 483 | for dataset_path in self.get_output_fnames(): |
---|
| 484 | try: |
---|
| 485 | shutil.move( dataset_path.false_path, dataset_path.real_path ) |
---|
| 486 | log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) |
---|
| 487 | except ( IOError, OSError ): |
---|
| 488 | # this can happen if Galaxy is restarted during the job's |
---|
| 489 | # finish method - the false_path file has already moved, |
---|
| 490 | # and when the job is recovered, it won't be found. |
---|
| 491 | if os.path.exists( dataset_path.real_path ) and os.stat( dataset_path.real_path ).st_size > 0: |
---|
| 492 | log.warning( "finish(): %s not found, but %s is not empty, so it will be used instead" % ( dataset_path.false_path, dataset_path.real_path ) ) |
---|
| 493 | else: |
---|
| 494 | self.fail( "Job %s's output dataset(s) could not be read" % job.id ) |
---|
| 495 | return |
---|
| 496 | job_context = ExpressionContext( dict( stdout = stdout, stderr = stderr ) ) |
---|
| 497 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
---|
| 498 | context = self.get_dataset_finish_context( job_context, dataset_assoc.dataset.dataset ) |
---|
| 499 | #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur |
---|
| 500 | for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: #need to update all associated output hdas, i.e. history was shared with job running |
---|
| 501 | dataset.blurb = 'done' |
---|
| 502 | dataset.peek = 'no peek' |
---|
| 503 | dataset.info = context['stdout'] + context['stderr'] |
---|
| 504 | dataset.set_size() |
---|
| 505 | if context['stderr']: |
---|
| 506 | dataset.blurb = "error" |
---|
| 507 | elif dataset.has_data(): |
---|
| 508 | # If the tool was expected to set the extension, attempt to retrieve it |
---|
| 509 | if dataset.ext == 'auto': |
---|
| 510 | dataset.extension = context.get( 'ext', 'data' ) |
---|
| 511 | dataset.init_meta( copy_from=dataset ) |
---|
| 512 | #if a dataset was copied, it won't appear in our dictionary: |
---|
| 513 | #either use the metadata from originating output dataset, or call set_meta on the copies |
---|
| 514 | #it would be quicker to just copy the metadata from the originating output dataset, |
---|
| 515 | #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() |
---|
| 516 | if not self.app.config.set_metadata_externally or \ |
---|
| 517 | ( not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) \ |
---|
| 518 | and self.app.config.retry_metadata_internally ): |
---|
| 519 | dataset.set_meta( overwrite = False ) |
---|
| 520 | elif not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) and not context['stderr']: |
---|
| 521 | dataset._state = model.Dataset.states.FAILED_METADATA |
---|
| 522 | else: |
---|
| 523 | #load metadata from file |
---|
| 524 | #we need to no longer allow metadata to be edited while the job is still running, |
---|
| 525 | #since if it is edited, the metadata changed on the running output will no longer match |
---|
| 526 | #the metadata that was stored to disk for use via the external process, |
---|
| 527 | #and the changes made by the user will be lost, without warning or notice |
---|
| 528 | dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset, self.sa_session ).filename_out ) |
---|
| 529 | try: |
---|
| 530 | assert context.get( 'line_count', None ) is not None |
---|
| 531 | if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte: |
---|
| 532 | dataset.set_peek( line_count=context['line_count'], is_multi_byte=True ) |
---|
| 533 | else: |
---|
| 534 | dataset.set_peek( line_count=context['line_count'] ) |
---|
| 535 | except: |
---|
| 536 | if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte: |
---|
| 537 | dataset.set_peek( is_multi_byte=True ) |
---|
| 538 | else: |
---|
| 539 | dataset.set_peek() |
---|
| 540 | try: |
---|
| 541 | # set the name if provided by the tool |
---|
| 542 | dataset.name = context['name'] |
---|
| 543 | except: |
---|
| 544 | pass |
---|
| 545 | else: |
---|
| 546 | dataset.blurb = "empty" |
---|
| 547 | if dataset.ext == 'auto': |
---|
| 548 | dataset.extension = 'txt' |
---|
| 549 | self.sa_session.add( dataset ) |
---|
| 550 | if context['stderr']: |
---|
| 551 | dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR |
---|
| 552 | else: |
---|
| 553 | dataset_assoc.dataset.dataset.state = model.Dataset.states.OK |
---|
| 554 | # If any of the rest of the finish method below raises an |
---|
| 555 | # exception, the fail method will run and set the datasets to |
---|
| 556 | # ERROR. The user will never see that the datasets are in error if |
---|
| 557 | # they were flushed as OK here, since upon doing so, the history |
---|
| 558 | # panel stops checking for updates. So allow the |
---|
| 559 | # self.sa_session.flush() at the bottom of this method set |
---|
| 560 | # the state instead. |
---|
| 561 | |
---|
| 562 | for pja in job.post_job_actions: |
---|
| 563 | ActionBox.execute(self.app, self.sa_session, pja.post_job_action, job) |
---|
| 564 | # Flush all the dataset and job changes above. Dataset state changes |
---|
| 565 | # will now be seen by the user. |
---|
| 566 | self.sa_session.flush() |
---|
| 567 | # Save stdout and stderr |
---|
| 568 | if len( stdout ) > 32768: |
---|
| 569 | log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) |
---|
| 570 | job.stdout = stdout[:32768] |
---|
| 571 | if len( stderr ) > 32768: |
---|
| 572 | log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) |
---|
| 573 | job.stderr = stderr[:32768] |
---|
| 574 | # custom post process setup |
---|
| 575 | inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) |
---|
| 576 | out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) |
---|
| 577 | out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) |
---|
| 578 | param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows |
---|
| 579 | param_dict = self.tool.params_from_strings( param_dict, self.app ) |
---|
| 580 | # Check for and move associated_files |
---|
| 581 | self.tool.collect_associated_files(out_data, self.working_directory) |
---|
| 582 | # Create generated output children and primary datasets and add to param_dict |
---|
| 583 | collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)} |
---|
| 584 | param_dict.update({'__collected_datasets__':collected_datasets}) |
---|
| 585 | # Certain tools require tasks to be completed after job execution |
---|
| 586 | # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). |
---|
| 587 | self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) |
---|
| 588 | # Call 'exec_after_process' hook |
---|
| 589 | self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, |
---|
| 590 | out_data=out_data, param_dict=param_dict, |
---|
| 591 | tool=self.tool, stdout=stdout, stderr=stderr ) |
---|
| 592 | job.command_line = self.command_line |
---|
| 593 | |
---|
| 594 | # fix permissions |
---|
| 595 | for path in [ dp.real_path for dp in self.get_output_fnames() ]: |
---|
| 596 | util.umask_fix_perms( path, self.app.config.umask, 0666, self.app.config.gid ) |
---|
| 597 | self.sa_session.flush() |
---|
| 598 | log.debug( 'job %d ended' % self.job_id ) |
---|
| 599 | self.cleanup() |
---|
| 600 | |
---|
| 601 | def cleanup( self ): |
---|
| 602 | # remove temporary files |
---|
| 603 | try: |
---|
| 604 | for fname in self.extra_filenames: |
---|
| 605 | os.remove( fname ) |
---|
| 606 | if self.working_directory is not None: |
---|
| 607 | shutil.rmtree( self.working_directory ) |
---|
| 608 | if self.app.config.set_metadata_externally: |
---|
| 609 | self.external_output_metadata.cleanup_external_metadata( self.sa_session ) |
---|
| 610 | galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session ) |
---|
| 611 | except: |
---|
| 612 | log.exception( "Unable to cleanup job %d" % self.job_id ) |
---|
| 613 | |
---|
| 614 | def get_command_line( self ): |
---|
| 615 | return self.command_line |
---|
| 616 | |
---|
| 617 | def get_session_id( self ): |
---|
| 618 | return self.session_id |
---|
| 619 | |
---|
| 620 | def get_input_fnames( self ): |
---|
| 621 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 622 | filenames = [] |
---|
| 623 | for da in job.input_datasets: #da is JobToInputDatasetAssociation object |
---|
| 624 | if da.dataset: |
---|
| 625 | filenames.append( da.dataset.file_name ) |
---|
| 626 | #we will need to stage in metadata file names also |
---|
| 627 | #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.) |
---|
| 628 | for key, value in da.dataset.metadata.items(): |
---|
| 629 | if isinstance( value, model.MetadataFile ): |
---|
| 630 | filenames.append( value.file_name ) |
---|
| 631 | return filenames |
---|
| 632 | |
---|
| 633 | def get_output_fnames( self ): |
---|
| 634 | if self.output_paths is not None: |
---|
| 635 | return self.output_paths |
---|
| 636 | |
---|
| 637 | class DatasetPath( object ): |
---|
| 638 | def __init__( self, dataset_id, real_path, false_path = None ): |
---|
| 639 | self.dataset_id = dataset_id |
---|
| 640 | self.real_path = real_path |
---|
| 641 | self.false_path = false_path |
---|
| 642 | def __str__( self ): |
---|
| 643 | if self.false_path is None: |
---|
| 644 | return self.real_path |
---|
| 645 | else: |
---|
| 646 | return self.false_path |
---|
| 647 | |
---|
| 648 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 649 | # Job output datasets are combination of output datasets, library datasets, and jeha datasets. |
---|
| 650 | jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first() |
---|
| 651 | if self.app.config.outputs_to_working_directory: |
---|
| 652 | self.output_paths = [] |
---|
| 653 | for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets + job.output_library_datasets ]: |
---|
| 654 | false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) ) |
---|
| 655 | self.output_paths.append( DatasetPath( data.id, data.file_name, false_path ) ) |
---|
| 656 | if jeha: |
---|
| 657 | false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % jeha.dataset.id ) ) |
---|
| 658 | self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name, false_path ) ) |
---|
| 659 | else: |
---|
| 660 | self.output_paths = [ DatasetPath( da.dataset.dataset.id, da.dataset.file_name ) for da in job.output_datasets + job.output_library_datasets ] |
---|
| 661 | if jeha: |
---|
| 662 | self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name ) ) |
---|
| 663 | |
---|
| 664 | return self.output_paths |
---|
| 665 | |
---|
| 666 | def get_output_file_id( self, file ): |
---|
| 667 | if self.output_paths is None: |
---|
| 668 | self.get_output_fnames() |
---|
| 669 | for dp in self.output_paths: |
---|
| 670 | if self.app.config.outputs_to_working_directory and os.path.basename( dp.false_path ) == file: |
---|
| 671 | return dp.dataset_id |
---|
| 672 | elif os.path.basename( dp.real_path ) == file: |
---|
| 673 | return dp.dataset_id |
---|
| 674 | return None |
---|
| 675 | |
---|
| 676 | def get_tool_provided_job_metadata( self ): |
---|
| 677 | if self.tool_provided_job_metadata is not None: |
---|
| 678 | return self.tool_provided_job_metadata |
---|
| 679 | |
---|
| 680 | # Look for JSONified job metadata |
---|
| 681 | self.tool_provided_job_metadata = [] |
---|
| 682 | meta_file = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ) |
---|
| 683 | if os.path.exists( meta_file ): |
---|
| 684 | for line in open( meta_file, 'r' ): |
---|
| 685 | try: |
---|
| 686 | line = from_json_string( line ) |
---|
| 687 | assert 'type' in line |
---|
| 688 | except: |
---|
| 689 | log.exception( '(%s) Got JSON data from tool, but data is improperly formatted or no "type" key in data' % self.job_id ) |
---|
| 690 | log.debug( 'Offending data was: %s' % line ) |
---|
| 691 | continue |
---|
| 692 | # Set the dataset id if it's a dataset entry and isn't set. |
---|
| 693 | # This isn't insecure. We loop the job's output datasets in |
---|
| 694 | # the finish method, so if a tool writes out metadata for a |
---|
| 695 | # dataset id that it doesn't own, it'll just be ignored. |
---|
| 696 | if line['type'] == 'dataset' and 'dataset_id' not in line: |
---|
| 697 | try: |
---|
| 698 | line['dataset_id'] = self.get_output_file_id( line['dataset'] ) |
---|
| 699 | except KeyError: |
---|
| 700 | log.warning( '(%s) Tool provided job dataset-specific metadata without specifying a dataset' % self.job_id ) |
---|
| 701 | continue |
---|
| 702 | self.tool_provided_job_metadata.append( line ) |
---|
| 703 | return self.tool_provided_job_metadata |
---|
| 704 | |
---|
| 705 | def get_dataset_finish_context( self, job_context, dataset ): |
---|
| 706 | for meta in self.get_tool_provided_job_metadata(): |
---|
| 707 | if meta['type'] == 'dataset' and meta['dataset_id'] == dataset.id: |
---|
| 708 | return ExpressionContext( meta, job_context ) |
---|
| 709 | return job_context |
---|
| 710 | |
---|
| 711 | def check_output_sizes( self ): |
---|
| 712 | sizes = [] |
---|
| 713 | output_paths = self.get_output_fnames() |
---|
| 714 | for outfile in [ str( o ) for o in output_paths ]: |
---|
| 715 | sizes.append( ( outfile, os.stat( outfile ).st_size ) ) |
---|
| 716 | return sizes |
---|
| 717 | |
---|
| 718 | def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ): |
---|
| 719 | # extension could still be 'auto' if this is the upload tool. |
---|
| 720 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 721 | if set_extension: |
---|
| 722 | for output_dataset_assoc in job.output_datasets: |
---|
| 723 | if output_dataset_assoc.dataset.ext == 'auto': |
---|
| 724 | context = self.get_dataset_finish_context( dict(), output_dataset_assoc.dataset.dataset ) |
---|
| 725 | output_dataset_assoc.dataset.extension = context.get( 'ext', 'data' ) |
---|
| 726 | self.sa_session.flush() |
---|
| 727 | if tmp_dir is None: |
---|
| 728 | #this dir should should relative to the exec_dir |
---|
| 729 | tmp_dir = self.app.config.new_file_path |
---|
| 730 | if dataset_files_path is None: |
---|
| 731 | dataset_files_path = self.app.model.Dataset.file_path |
---|
| 732 | if config_root is None: |
---|
| 733 | config_root = self.app.config.root |
---|
| 734 | if datatypes_config is None: |
---|
| 735 | datatypes_config = self.app.config.datatypes_config |
---|
| 736 | return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], |
---|
| 737 | self.sa_session, |
---|
| 738 | exec_dir = exec_dir, |
---|
| 739 | tmp_dir = tmp_dir, |
---|
| 740 | dataset_files_path = dataset_files_path, |
---|
| 741 | config_root = config_root, |
---|
| 742 | datatypes_config = datatypes_config, |
---|
| 743 | job_metadata = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ), |
---|
| 744 | **kwds ) |
---|
| 745 | |
---|
| 746 | @property |
---|
| 747 | def user( self ): |
---|
| 748 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
| 749 | if job.user is not None: |
---|
| 750 | return job.user.email |
---|
| 751 | elif job.galaxy_session is not None and job.galaxy_session.user is not None: |
---|
| 752 | return job.galaxy_session.user.email |
---|
| 753 | elif job.history is not None and job.history.user is not None: |
---|
| 754 | return job.history.user.email |
---|
| 755 | elif job.galaxy_session is not None: |
---|
| 756 | return 'anonymous@' + job.galaxy_session.remote_addr.split()[-1] |
---|
| 757 | else: |
---|
| 758 | return 'anonymous@unknown' |
---|
| 759 | |
---|
| 760 | class DefaultJobDispatcher( object ): |
---|
| 761 | def __init__( self, app ): |
---|
| 762 | self.app = app |
---|
| 763 | self.job_runners = {} |
---|
| 764 | start_job_runners = ["local"] |
---|
| 765 | if app.config.start_job_runners is not None: |
---|
| 766 | start_job_runners.extend( app.config.start_job_runners.split(",") ) |
---|
| 767 | for runner_name in start_job_runners: |
---|
| 768 | if runner_name == "local": |
---|
| 769 | import runners.local |
---|
| 770 | self.job_runners[runner_name] = runners.local.LocalJobRunner( app ) |
---|
| 771 | elif runner_name == "pbs": |
---|
| 772 | import runners.pbs |
---|
| 773 | self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app ) |
---|
| 774 | elif runner_name == "sge": |
---|
| 775 | import runners.sge |
---|
| 776 | self.job_runners[runner_name] = runners.sge.SGEJobRunner( app ) |
---|
| 777 | elif runner_name == "drmaa": |
---|
| 778 | import runners.drmaa |
---|
| 779 | self.job_runners[runner_name] = runners.drmaa.DRMAAJobRunner( app ) |
---|
| 780 | else: |
---|
| 781 | log.error( "Unable to start unknown job runner: %s" %runner_name ) |
---|
| 782 | |
---|
| 783 | def put( self, job_wrapper ): |
---|
| 784 | runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] |
---|
| 785 | log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) |
---|
| 786 | self.job_runners[runner_name].put( job_wrapper ) |
---|
| 787 | |
---|
| 788 | def stop( self, job ): |
---|
| 789 | runner_name = ( job.job_runner_name.split(":", 1) )[0] |
---|
| 790 | log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) |
---|
| 791 | self.job_runners[runner_name].stop_job( job ) |
---|
| 792 | |
---|
| 793 | def recover( self, job, job_wrapper ): |
---|
| 794 | runner_name = ( job.job_runner_name.split(":", 1) )[0] |
---|
| 795 | log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) |
---|
| 796 | self.job_runners[runner_name].recover( job, job_wrapper ) |
---|
| 797 | |
---|
| 798 | def shutdown( self ): |
---|
| 799 | for runner in self.job_runners.itervalues(): |
---|
| 800 | runner.shutdown() |
---|
| 801 | |
---|
| 802 | class JobStopQueue( object ): |
---|
| 803 | """ |
---|
| 804 | A queue for jobs which need to be terminated prematurely. |
---|
| 805 | """ |
---|
| 806 | STOP_SIGNAL = object() |
---|
| 807 | def __init__( self, app, dispatcher ): |
---|
| 808 | self.app = app |
---|
| 809 | self.sa_session = app.model.context |
---|
| 810 | self.dispatcher = dispatcher |
---|
| 811 | |
---|
| 812 | # Keep track of the pid that started the job manager, only it |
---|
| 813 | # has valid threads |
---|
| 814 | self.parent_pid = os.getpid() |
---|
| 815 | # Contains new jobs. Note this is not used if track_jobs_in_database is True |
---|
| 816 | self.queue = Queue() |
---|
| 817 | |
---|
| 818 | # Contains jobs that are waiting (only use from monitor thread) |
---|
| 819 | self.waiting = [] |
---|
| 820 | |
---|
| 821 | # Helper for interruptable sleep |
---|
| 822 | self.sleeper = Sleeper() |
---|
| 823 | self.running = True |
---|
| 824 | self.monitor_thread = threading.Thread( target=self.monitor ) |
---|
| 825 | self.monitor_thread.start() |
---|
| 826 | log.info( "job stopper started" ) |
---|
| 827 | |
---|
| 828 | def monitor( self ): |
---|
| 829 | """ |
---|
| 830 | Continually iterate the waiting jobs, stop any that are found. |
---|
| 831 | """ |
---|
| 832 | # HACK: Delay until after forking, we need a way to do post fork notification!!! |
---|
| 833 | time.sleep( 10 ) |
---|
| 834 | while self.running: |
---|
| 835 | try: |
---|
| 836 | self.monitor_step() |
---|
| 837 | except: |
---|
| 838 | log.exception( "Exception in monitor_step" ) |
---|
| 839 | # Sleep |
---|
| 840 | self.sleeper.sleep( 1 ) |
---|
| 841 | |
---|
| 842 | def monitor_step( self ): |
---|
| 843 | """ |
---|
| 844 | Called repeatedly by `monitor` to stop jobs. |
---|
| 845 | """ |
---|
| 846 | # Pull all new jobs from the queue at once |
---|
| 847 | jobs = [] |
---|
| 848 | try: |
---|
| 849 | while 1: |
---|
| 850 | ( job_id, error_msg ) = self.queue.get_nowait() |
---|
| 851 | if job_id is self.STOP_SIGNAL: |
---|
| 852 | return |
---|
| 853 | # Append to watch queue |
---|
| 854 | jobs.append( ( job_id, error_msg ) ) |
---|
| 855 | except Empty: |
---|
| 856 | pass |
---|
| 857 | |
---|
| 858 | for job_id, error_msg in jobs: |
---|
| 859 | job = self.sa_session.query( model.Job ).get( job_id ) |
---|
| 860 | self.sa_session.refresh( job ) |
---|
| 861 | # if desired, error the job so we can inform the user. |
---|
| 862 | if error_msg is not None: |
---|
| 863 | job.state = job.states.ERROR |
---|
| 864 | job.info = error_msg |
---|
| 865 | else: |
---|
| 866 | job.state = job.states.DELETED |
---|
| 867 | self.sa_session.add( job ) |
---|
| 868 | self.sa_session.flush() |
---|
| 869 | # if job is in JobQueue or FooJobRunner's put method, |
---|
| 870 | # job_runner_name will be unset and the job will be dequeued due to |
---|
| 871 | # state change above |
---|
| 872 | if job.job_runner_name is not None: |
---|
| 873 | # tell the dispatcher to stop the job |
---|
| 874 | self.dispatcher.stop( job ) |
---|
| 875 | |
---|
| 876 | def put( self, job_id, error_msg=None ): |
---|
| 877 | self.queue.put( ( job_id, error_msg ) ) |
---|
| 878 | |
---|
| 879 | def shutdown( self ): |
---|
| 880 | """Attempts to gracefully shut down the worker thread""" |
---|
| 881 | if self.parent_pid != os.getpid(): |
---|
| 882 | # We're not the real job queue, do nothing |
---|
| 883 | return |
---|
| 884 | else: |
---|
| 885 | log.info( "sending stop signal to worker thread" ) |
---|
| 886 | self.running = False |
---|
| 887 | self.queue.put( ( self.STOP_SIGNAL, None ) ) |
---|
| 888 | self.sleeper.wake() |
---|
| 889 | log.info( "job stopper stopped" ) |
---|
| 890 | |
---|
| 891 | class NoopQueue( object ): |
---|
| 892 | """ |
---|
| 893 | Implements the JobQueue / JobStopQueue interface but does nothing |
---|
| 894 | """ |
---|
| 895 | def put( self, *args ): |
---|
| 896 | return |
---|
| 897 | def shutdown( self ): |
---|
| 898 | return |
---|
| 899 | |
---|