1 | import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil |
---|
2 | |
---|
3 | import galaxy |
---|
4 | from galaxy import util, model |
---|
5 | from galaxy.model.orm import lazyload |
---|
6 | from galaxy.datatypes.tabular import * |
---|
7 | from galaxy.datatypes.interval import * |
---|
8 | from galaxy.datatypes import metadata |
---|
9 | from galaxy.util.json import from_json_string |
---|
10 | from galaxy.util.expressions import ExpressionContext |
---|
11 | from galaxy.jobs.actions.post import ActionBox |
---|
12 | |
---|
13 | import pkg_resources |
---|
14 | pkg_resources.require( "PasteDeploy" ) |
---|
15 | |
---|
16 | from paste.deploy.converters import asbool |
---|
17 | |
---|
18 | from Queue import Queue, Empty |
---|
19 | |
---|
20 | log = logging.getLogger( __name__ ) |
---|
21 | |
---|
22 | # States for running a job. These are NOT the same as data states |
---|
23 | JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted' |
---|
24 | |
---|
25 | # This file, if created in the job's working directory, will be used for |
---|
26 | # setting advanced metadata properties on the job and its associated outputs. |
---|
27 | # This interface is currently experimental, is only used by the upload tool, |
---|
28 | # and should eventually become API'd |
---|
29 | TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json' |
---|
30 | |
---|
31 | class JobManager( object ): |
---|
32 | """ |
---|
33 | Highest level interface to job management. |
---|
34 | |
---|
35 | TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly. |
---|
36 | This should be decoupled. |
---|
37 | """ |
---|
38 | def __init__( self, app ): |
---|
39 | self.app = app |
---|
40 | if self.app.config.get_bool( "enable_job_running", True ): |
---|
41 | # The dispatcher launches the underlying job runners |
---|
42 | self.dispatcher = DefaultJobDispatcher( app ) |
---|
43 | # Queues for starting and stopping jobs |
---|
44 | self.job_queue = JobQueue( app, self.dispatcher ) |
---|
45 | self.job_stop_queue = JobStopQueue( app, self.dispatcher ) |
---|
46 | else: |
---|
47 | self.job_queue = self.job_stop_queue = NoopQueue() |
---|
48 | def shutdown( self ): |
---|
49 | self.job_queue.shutdown() |
---|
50 | self.job_stop_queue.shutdown() |
---|
51 | |
---|
52 | class Sleeper( object ): |
---|
53 | """ |
---|
54 | Provides a 'sleep' method that sleeps for a number of seconds *unless* |
---|
55 | the notify method is called (from a different thread). |
---|
56 | """ |
---|
57 | def __init__( self ): |
---|
58 | self.condition = threading.Condition() |
---|
59 | def sleep( self, seconds ): |
---|
60 | self.condition.acquire() |
---|
61 | self.condition.wait( seconds ) |
---|
62 | self.condition.release() |
---|
63 | def wake( self ): |
---|
64 | self.condition.acquire() |
---|
65 | self.condition.notify() |
---|
66 | self.condition.release() |
---|
67 | |
---|
68 | class JobQueue( object ): |
---|
69 | """ |
---|
70 | Job manager, waits for jobs to be runnable and then dispatches to |
---|
71 | a JobRunner. |
---|
72 | """ |
---|
73 | STOP_SIGNAL = object() |
---|
74 | def __init__( self, app, dispatcher ): |
---|
75 | """Start the job manager""" |
---|
76 | self.app = app |
---|
77 | self.sa_session = app.model.context |
---|
78 | self.job_lock = False |
---|
79 | # Should we read jobs form the database, or use an in memory queue |
---|
80 | self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False ) |
---|
81 | # Keep track of the pid that started the job manager, only it |
---|
82 | # has valid threads |
---|
83 | self.parent_pid = os.getpid() |
---|
84 | # Contains new jobs. Note this is not used if track_jobs_in_database is True |
---|
85 | self.queue = Queue() |
---|
86 | |
---|
87 | # Contains jobs that are waiting (only use from monitor thread) |
---|
88 | ## This and jobs_to_check[] are closest to a "Job Queue" |
---|
89 | self.waiting_jobs = [] |
---|
90 | |
---|
91 | # Helper for interruptable sleep |
---|
92 | self.sleeper = Sleeper() |
---|
93 | self.running = True |
---|
94 | self.dispatcher = dispatcher |
---|
95 | self.monitor_thread = threading.Thread( target=self.__monitor ) |
---|
96 | self.monitor_thread.start() |
---|
97 | log.info( "job manager started" ) |
---|
98 | if app.config.get_bool( 'enable_job_recovery', True ): |
---|
99 | self.__check_jobs_at_startup() |
---|
100 | |
---|
101 | def __check_jobs_at_startup( self ): |
---|
102 | """ |
---|
103 | Checks all jobs that are in the 'new', 'queued' or 'running' state in |
---|
104 | the database and requeues or cleans up as necessary. Only run as the |
---|
105 | job manager starts. |
---|
106 | """ |
---|
107 | model = self.app.model |
---|
108 | for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ): |
---|
109 | if job.tool_id not in self.app.toolbox.tools_by_id: |
---|
110 | log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) |
---|
111 | JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) |
---|
112 | else: |
---|
113 | log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id ) |
---|
114 | self.queue.put( ( job.id, job.tool_id ) ) |
---|
115 | for job in self.sa_session.query( model.Job ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ): |
---|
116 | if job.tool_id not in self.app.toolbox.tools_by_id: |
---|
117 | log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) |
---|
118 | JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) |
---|
119 | elif job.job_runner_name is None: |
---|
120 | log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id ) |
---|
121 | if self.track_jobs_in_database: |
---|
122 | job.state = model.Job.states.NEW |
---|
123 | else: |
---|
124 | self.queue.put( ( job.id, job.tool_id ) ) |
---|
125 | else: |
---|
126 | job_wrapper = JobWrapper( job, self ) |
---|
127 | self.dispatcher.recover( job, job_wrapper ) |
---|
128 | if self.sa_session.dirty: |
---|
129 | self.sa_session.flush() |
---|
130 | |
---|
131 | def __monitor( self ): |
---|
132 | """ |
---|
133 | Continually iterate the waiting jobs, checking is each is ready to |
---|
134 | run and dispatching if so. |
---|
135 | """ |
---|
136 | # HACK: Delay until after forking, we need a way to do post fork notification!!! |
---|
137 | time.sleep( 10 ) |
---|
138 | while self.running: |
---|
139 | try: |
---|
140 | self.__monitor_step() |
---|
141 | except: |
---|
142 | log.exception( "Exception in monitor_step" ) |
---|
143 | # Sleep |
---|
144 | self.sleeper.sleep( 1 ) |
---|
145 | |
---|
146 | def __monitor_step( self ): |
---|
147 | """ |
---|
148 | Called repeatedly by `monitor` to process waiting jobs. Gets any new |
---|
149 | jobs (either from the database or from its own queue), then iterates |
---|
150 | over all new and waiting jobs to check the state of the jobs each |
---|
151 | depends on. If the job has dependencies that have not finished, it |
---|
152 | it goes to the waiting queue. If the job has dependencies with errors, |
---|
153 | it is marked as having errors and removed from the queue. Otherwise, |
---|
154 | the job is dispatched. |
---|
155 | """ |
---|
156 | # Pull all new jobs from the queue at once |
---|
157 | jobs_to_check = [] |
---|
158 | if self.track_jobs_in_database: |
---|
159 | # Clear the session so we get fresh states for job and all datasets |
---|
160 | self.sa_session.expunge_all() |
---|
161 | # Fetch all new jobs |
---|
162 | jobs_to_check = self.sa_session.query( model.Job ) \ |
---|
163 | .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ |
---|
164 | .filter( model.Job.state == model.Job.states.NEW ).all() |
---|
165 | else: |
---|
166 | try: |
---|
167 | while 1: |
---|
168 | message = self.queue.get_nowait() |
---|
169 | if message is self.STOP_SIGNAL: |
---|
170 | return |
---|
171 | # Unpack the message |
---|
172 | job_id, tool_id = message |
---|
173 | # Get the job object and append to watch queue |
---|
174 | jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) |
---|
175 | except Empty: |
---|
176 | pass |
---|
177 | # Get job objects and append to watch queue for any which were |
---|
178 | # previously waiting |
---|
179 | for job_id in self.waiting_jobs: |
---|
180 | jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) |
---|
181 | # Iterate over new and waiting jobs and look for any that are |
---|
182 | # ready to run |
---|
183 | new_waiting_jobs = [] |
---|
184 | for job in jobs_to_check: |
---|
185 | try: |
---|
186 | # Check the job's dependencies, requeue if they're not done |
---|
187 | job_state = self.__check_if_ready_to_run( job ) |
---|
188 | if job_state == JOB_WAIT: |
---|
189 | if not self.track_jobs_in_database: |
---|
190 | new_waiting_jobs.append( job.id ) |
---|
191 | elif job_state == JOB_INPUT_ERROR: |
---|
192 | log.info( "job %d unable to run: one or more inputs in error state" % job.id ) |
---|
193 | elif job_state == JOB_INPUT_DELETED: |
---|
194 | log.info( "job %d unable to run: one or more inputs deleted" % job.id ) |
---|
195 | elif job_state == JOB_READY: |
---|
196 | if self.job_lock: |
---|
197 | log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id ) |
---|
198 | if not self.track_jobs_in_database: |
---|
199 | new_waiting_jobs.append( job.id ) |
---|
200 | else: |
---|
201 | self.dispatcher.put( JobWrapper( job, self ) ) |
---|
202 | log.info( "job %d dispatched" % job.id ) |
---|
203 | elif job_state == JOB_DELETED: |
---|
204 | log.info( "job %d deleted by user while still queued" % job.id ) |
---|
205 | elif job_state == JOB_ADMIN_DELETED: |
---|
206 | job.info( "job %d deleted by admin while still queued" % job.id ) |
---|
207 | else: |
---|
208 | log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) ) |
---|
209 | if not self.track_jobs_in_database: |
---|
210 | new_waiting_jobs.append( job.id ) |
---|
211 | except Exception, e: |
---|
212 | log.exception( "failure running job %d" % job.id ) |
---|
213 | # Update the waiting list |
---|
214 | self.waiting_jobs = new_waiting_jobs |
---|
215 | # Done with the session |
---|
216 | self.sa_session.remove() |
---|
217 | |
---|
218 | def __check_if_ready_to_run( self, job ): |
---|
219 | """ |
---|
220 | Check if a job is ready to run by verifying that each of its input |
---|
221 | datasets is ready (specifically in the OK state). If any input dataset |
---|
222 | has an error, fail the job and return JOB_INPUT_ERROR. If any input |
---|
223 | dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all |
---|
224 | input datasets are in OK state, return JOB_READY indicating that the |
---|
225 | job can be dispatched. Otherwise, return JOB_WAIT indicating that input |
---|
226 | datasets are still being prepared. |
---|
227 | """ |
---|
228 | if job.state == model.Job.states.DELETED: |
---|
229 | return JOB_DELETED |
---|
230 | elif job.state == model.Job.states.ERROR: |
---|
231 | return JOB_ADMIN_DELETED |
---|
232 | for dataset_assoc in job.input_datasets: |
---|
233 | idata = dataset_assoc.dataset |
---|
234 | if not idata: |
---|
235 | continue |
---|
236 | # don't run jobs for which the input dataset was deleted |
---|
237 | if idata.deleted: |
---|
238 | JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) |
---|
239 | return JOB_INPUT_DELETED |
---|
240 | # an error in the input data causes us to bail immediately |
---|
241 | elif idata.state == idata.states.ERROR: |
---|
242 | JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) ) |
---|
243 | return JOB_INPUT_ERROR |
---|
244 | elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): |
---|
245 | # need to requeue |
---|
246 | return JOB_WAIT |
---|
247 | return JOB_READY |
---|
248 | |
---|
249 | def put( self, job_id, tool ): |
---|
250 | """Add a job to the queue (by job identifier)""" |
---|
251 | if not self.track_jobs_in_database: |
---|
252 | self.queue.put( ( job_id, tool.id ) ) |
---|
253 | self.sleeper.wake() |
---|
254 | |
---|
255 | def shutdown( self ): |
---|
256 | """Attempts to gracefully shut down the worker thread""" |
---|
257 | if self.parent_pid != os.getpid(): |
---|
258 | # We're not the real job queue, do nothing |
---|
259 | return |
---|
260 | else: |
---|
261 | log.info( "sending stop signal to worker thread" ) |
---|
262 | self.running = False |
---|
263 | if not self.track_jobs_in_database: |
---|
264 | self.queue.put( self.STOP_SIGNAL ) |
---|
265 | self.sleeper.wake() |
---|
266 | log.info( "job queue stopped" ) |
---|
267 | self.dispatcher.shutdown() |
---|
268 | |
---|
269 | class JobWrapper( object ): |
---|
270 | """ |
---|
271 | Wraps a 'model.Job' with convience methods for running processes and |
---|
272 | state management. |
---|
273 | """ |
---|
274 | def __init__( self, job, queue ): |
---|
275 | self.job_id = job.id |
---|
276 | self.session_id = job.session_id |
---|
277 | self.user_id = job.user_id |
---|
278 | self.tool = queue.app.toolbox.tools_by_id.get( job.tool_id, None ) |
---|
279 | self.queue = queue |
---|
280 | self.app = queue.app |
---|
281 | self.sa_session = self.app.model.context |
---|
282 | self.extra_filenames = [] |
---|
283 | self.command_line = None |
---|
284 | self.galaxy_lib_dir = None |
---|
285 | # With job outputs in the working directory, we need the working |
---|
286 | # directory to be set before prepare is run, or else premature deletion |
---|
287 | # and job recovery fail. |
---|
288 | self.working_directory = \ |
---|
289 | os.path.join( self.app.config.job_working_directory, str( self.job_id ) ) |
---|
290 | self.output_paths = None |
---|
291 | self.tool_provided_job_metadata = None |
---|
292 | # Wrapper holding the info required to restore and clean up from files used for setting metadata externally |
---|
293 | self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) |
---|
294 | |
---|
295 | def get_param_dict( self ): |
---|
296 | """ |
---|
297 | Restore the dictionary of parameters from the database. |
---|
298 | """ |
---|
299 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
300 | param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) |
---|
301 | param_dict = self.tool.params_from_strings( param_dict, self.app ) |
---|
302 | return param_dict |
---|
303 | |
---|
304 | def prepare( self ): |
---|
305 | """ |
---|
306 | Prepare the job to run by creating the working directory and the |
---|
307 | config files. |
---|
308 | """ |
---|
309 | self.sa_session.expunge_all() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner |
---|
310 | if not os.path.exists( self.working_directory ): |
---|
311 | os.mkdir( self.working_directory ) |
---|
312 | # Restore parameters from the database |
---|
313 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
314 | if job.user is None and job.galaxy_session is None: |
---|
315 | raise Exception( 'Job %s has no user and no session.' % job.id ) |
---|
316 | incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] ) |
---|
317 | incoming = self.tool.params_from_strings( incoming, self.app ) |
---|
318 | # Do any validation that could not be done at job creation |
---|
319 | self.tool.handle_unvalidated_param_values( incoming, self.app ) |
---|
320 | # Restore input / output data lists |
---|
321 | inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) |
---|
322 | out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) |
---|
323 | out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) |
---|
324 | |
---|
325 | # Set up output dataset association for export history jobs. Because job |
---|
326 | # uses a Dataset rather than an HDA or LDA, it's necessary to set up a |
---|
327 | # fake dataset association that provides the needed attributes for |
---|
328 | # preparing a job. |
---|
329 | class FakeDatasetAssociation ( object ): |
---|
330 | def __init__( self, dataset=None ): |
---|
331 | self.dataset = dataset |
---|
332 | self.file_name = dataset.file_name |
---|
333 | self.metadata = dict() |
---|
334 | self.children = [] |
---|
335 | jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first() |
---|
336 | if jeha: |
---|
337 | out_data[ "output_file" ] = FakeDatasetAssociation( dataset=jeha.dataset ) |
---|
338 | # These can be passed on the command line if wanted as $userId $userEmail |
---|
339 | if job.history and job.history.user: # check for anonymous user! |
---|
340 | userId = '%d' % job.history.user.id |
---|
341 | userEmail = str(job.history.user.email) |
---|
342 | else: |
---|
343 | userId = 'Anonymous' |
---|
344 | userEmail = 'Anonymous' |
---|
345 | incoming['userId'] = userId |
---|
346 | incoming['userEmail'] = userEmail |
---|
347 | # Build params, done before hook so hook can use |
---|
348 | param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory ) |
---|
349 | # Certain tools require tasks to be completed prior to job execution |
---|
350 | # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). |
---|
351 | self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) |
---|
352 | # Run the before queue ("exec_before_job") hook |
---|
353 | self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, |
---|
354 | out_data=out_data, tool=self.tool, param_dict=incoming) |
---|
355 | self.sa_session.flush() |
---|
356 | # Build any required config files |
---|
357 | config_filenames = self.tool.build_config_files( param_dict, self.working_directory ) |
---|
358 | # FIXME: Build the param file (might return None, DEPRECATED) |
---|
359 | param_filename = self.tool.build_param_file( param_dict, self.working_directory ) |
---|
360 | # Build the job's command line |
---|
361 | self.command_line = self.tool.build_command_line( param_dict ) |
---|
362 | # FIXME: for now, tools get Galaxy's lib dir in their path |
---|
363 | if self.command_line and self.command_line.startswith( 'python' ): |
---|
364 | self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root |
---|
365 | # Shell fragment to inject dependencies |
---|
366 | if self.app.config.use_tool_dependencies: |
---|
367 | self.dependency_shell_commands = self.tool.build_dependency_shell_commands() |
---|
368 | else: |
---|
369 | self.dependency_shell_commands = None |
---|
370 | # We need command_line persisted to the db in order for Galaxy to re-queue the job |
---|
371 | # if the server was stopped and restarted before the job finished |
---|
372 | job.command_line = self.command_line |
---|
373 | self.sa_session.add( job ) |
---|
374 | self.sa_session.flush() |
---|
375 | # Return list of all extra files |
---|
376 | extra_filenames = config_filenames |
---|
377 | if param_filename is not None: |
---|
378 | extra_filenames.append( param_filename ) |
---|
379 | self.param_dict = param_dict |
---|
380 | self.extra_filenames = extra_filenames |
---|
381 | return extra_filenames |
---|
382 | |
---|
383 | def fail( self, message, exception=False ): |
---|
384 | """ |
---|
385 | Indicate job failure by setting state and message on all output |
---|
386 | datasets. |
---|
387 | """ |
---|
388 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
389 | self.sa_session.refresh( job ) |
---|
390 | # if the job was deleted, don't fail it |
---|
391 | if not job.state == model.Job.states.DELETED: |
---|
392 | # Check if the failure is due to an exception |
---|
393 | if exception: |
---|
394 | # Save the traceback immediately in case we generate another |
---|
395 | # below |
---|
396 | job.traceback = traceback.format_exc() |
---|
397 | # Get the exception and let the tool attempt to generate |
---|
398 | # a better message |
---|
399 | etype, evalue, tb = sys.exc_info() |
---|
400 | m = self.tool.handle_job_failure_exception( evalue ) |
---|
401 | if m: |
---|
402 | message = m |
---|
403 | if self.app.config.outputs_to_working_directory: |
---|
404 | for dataset_path in self.get_output_fnames(): |
---|
405 | try: |
---|
406 | shutil.move( dataset_path.false_path, dataset_path.real_path ) |
---|
407 | log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) |
---|
408 | except ( IOError, OSError ), e: |
---|
409 | log.error( "fail(): Missing output file in working directory: %s" % e ) |
---|
410 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
---|
411 | dataset = dataset_assoc.dataset |
---|
412 | self.sa_session.refresh( dataset ) |
---|
413 | dataset.state = dataset.states.ERROR |
---|
414 | dataset.blurb = 'tool error' |
---|
415 | dataset.info = message |
---|
416 | dataset.set_size() |
---|
417 | if dataset.ext == 'auto': |
---|
418 | dataset.extension = 'data' |
---|
419 | self.sa_session.add( dataset ) |
---|
420 | self.sa_session.flush() |
---|
421 | job.state = model.Job.states.ERROR |
---|
422 | job.command_line = self.command_line |
---|
423 | job.info = message |
---|
424 | self.sa_session.add( job ) |
---|
425 | self.sa_session.flush() |
---|
426 | # If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up |
---|
427 | if self.tool: |
---|
428 | self.tool.job_failed( self, message, exception ) |
---|
429 | self.cleanup() |
---|
430 | |
---|
431 | def change_state( self, state, info = False ): |
---|
432 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
433 | self.sa_session.refresh( job ) |
---|
434 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
---|
435 | dataset = dataset_assoc.dataset |
---|
436 | self.sa_session.refresh( dataset ) |
---|
437 | dataset.state = state |
---|
438 | if info: |
---|
439 | dataset.info = info |
---|
440 | self.sa_session.add( dataset ) |
---|
441 | self.sa_session.flush() |
---|
442 | if info: |
---|
443 | job.info = info |
---|
444 | job.state = state |
---|
445 | self.sa_session.add( job ) |
---|
446 | self.sa_session.flush() |
---|
447 | |
---|
448 | def get_state( self ): |
---|
449 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
450 | self.sa_session.refresh( job ) |
---|
451 | return job.state |
---|
452 | |
---|
453 | def set_runner( self, runner_url, external_id ): |
---|
454 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
455 | self.sa_session.refresh( job ) |
---|
456 | job.job_runner_name = runner_url |
---|
457 | job.job_runner_external_id = external_id |
---|
458 | self.sa_session.add( job ) |
---|
459 | self.sa_session.flush() |
---|
460 | |
---|
461 | def finish( self, stdout, stderr ): |
---|
462 | """ |
---|
463 | Called to indicate that the associated command has been run. Updates |
---|
464 | the output datasets based on stderr and stdout from the command, and |
---|
465 | the contents of the output files. |
---|
466 | """ |
---|
467 | # default post job setup |
---|
468 | self.sa_session.expunge_all() |
---|
469 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
470 | # if the job was deleted, don't finish it |
---|
471 | if job.state == job.states.DELETED: |
---|
472 | self.cleanup() |
---|
473 | return |
---|
474 | elif job.state == job.states.ERROR: |
---|
475 | # Job was deleted by an administrator |
---|
476 | self.fail( job.info ) |
---|
477 | return |
---|
478 | if stderr: |
---|
479 | job.state = "error" |
---|
480 | else: |
---|
481 | job.state = 'ok' |
---|
482 | if self.app.config.outputs_to_working_directory: |
---|
483 | for dataset_path in self.get_output_fnames(): |
---|
484 | try: |
---|
485 | shutil.move( dataset_path.false_path, dataset_path.real_path ) |
---|
486 | log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) ) |
---|
487 | except ( IOError, OSError ): |
---|
488 | # this can happen if Galaxy is restarted during the job's |
---|
489 | # finish method - the false_path file has already moved, |
---|
490 | # and when the job is recovered, it won't be found. |
---|
491 | if os.path.exists( dataset_path.real_path ) and os.stat( dataset_path.real_path ).st_size > 0: |
---|
492 | log.warning( "finish(): %s not found, but %s is not empty, so it will be used instead" % ( dataset_path.false_path, dataset_path.real_path ) ) |
---|
493 | else: |
---|
494 | self.fail( "Job %s's output dataset(s) could not be read" % job.id ) |
---|
495 | return |
---|
496 | job_context = ExpressionContext( dict( stdout = stdout, stderr = stderr ) ) |
---|
497 | for dataset_assoc in job.output_datasets + job.output_library_datasets: |
---|
498 | context = self.get_dataset_finish_context( job_context, dataset_assoc.dataset.dataset ) |
---|
499 | #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur |
---|
500 | for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: #need to update all associated output hdas, i.e. history was shared with job running |
---|
501 | dataset.blurb = 'done' |
---|
502 | dataset.peek = 'no peek' |
---|
503 | dataset.info = context['stdout'] + context['stderr'] |
---|
504 | dataset.set_size() |
---|
505 | if context['stderr']: |
---|
506 | dataset.blurb = "error" |
---|
507 | elif dataset.has_data(): |
---|
508 | # If the tool was expected to set the extension, attempt to retrieve it |
---|
509 | if dataset.ext == 'auto': |
---|
510 | dataset.extension = context.get( 'ext', 'data' ) |
---|
511 | dataset.init_meta( copy_from=dataset ) |
---|
512 | #if a dataset was copied, it won't appear in our dictionary: |
---|
513 | #either use the metadata from originating output dataset, or call set_meta on the copies |
---|
514 | #it would be quicker to just copy the metadata from the originating output dataset, |
---|
515 | #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() |
---|
516 | if not self.app.config.set_metadata_externally or \ |
---|
517 | ( not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) \ |
---|
518 | and self.app.config.retry_metadata_internally ): |
---|
519 | dataset.set_meta( overwrite = False ) |
---|
520 | elif not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) and not context['stderr']: |
---|
521 | dataset._state = model.Dataset.states.FAILED_METADATA |
---|
522 | else: |
---|
523 | #load metadata from file |
---|
524 | #we need to no longer allow metadata to be edited while the job is still running, |
---|
525 | #since if it is edited, the metadata changed on the running output will no longer match |
---|
526 | #the metadata that was stored to disk for use via the external process, |
---|
527 | #and the changes made by the user will be lost, without warning or notice |
---|
528 | dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset, self.sa_session ).filename_out ) |
---|
529 | try: |
---|
530 | assert context.get( 'line_count', None ) is not None |
---|
531 | if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte: |
---|
532 | dataset.set_peek( line_count=context['line_count'], is_multi_byte=True ) |
---|
533 | else: |
---|
534 | dataset.set_peek( line_count=context['line_count'] ) |
---|
535 | except: |
---|
536 | if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte: |
---|
537 | dataset.set_peek( is_multi_byte=True ) |
---|
538 | else: |
---|
539 | dataset.set_peek() |
---|
540 | try: |
---|
541 | # set the name if provided by the tool |
---|
542 | dataset.name = context['name'] |
---|
543 | except: |
---|
544 | pass |
---|
545 | else: |
---|
546 | dataset.blurb = "empty" |
---|
547 | if dataset.ext == 'auto': |
---|
548 | dataset.extension = 'txt' |
---|
549 | self.sa_session.add( dataset ) |
---|
550 | if context['stderr']: |
---|
551 | dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR |
---|
552 | else: |
---|
553 | dataset_assoc.dataset.dataset.state = model.Dataset.states.OK |
---|
554 | # If any of the rest of the finish method below raises an |
---|
555 | # exception, the fail method will run and set the datasets to |
---|
556 | # ERROR. The user will never see that the datasets are in error if |
---|
557 | # they were flushed as OK here, since upon doing so, the history |
---|
558 | # panel stops checking for updates. So allow the |
---|
559 | # self.sa_session.flush() at the bottom of this method set |
---|
560 | # the state instead. |
---|
561 | |
---|
562 | for pja in job.post_job_actions: |
---|
563 | ActionBox.execute(self.app, self.sa_session, pja.post_job_action, job) |
---|
564 | # Flush all the dataset and job changes above. Dataset state changes |
---|
565 | # will now be seen by the user. |
---|
566 | self.sa_session.flush() |
---|
567 | # Save stdout and stderr |
---|
568 | if len( stdout ) > 32768: |
---|
569 | log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) |
---|
570 | job.stdout = stdout[:32768] |
---|
571 | if len( stderr ) > 32768: |
---|
572 | log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) |
---|
573 | job.stderr = stderr[:32768] |
---|
574 | # custom post process setup |
---|
575 | inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) |
---|
576 | out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) |
---|
577 | out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) |
---|
578 | param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows |
---|
579 | param_dict = self.tool.params_from_strings( param_dict, self.app ) |
---|
580 | # Check for and move associated_files |
---|
581 | self.tool.collect_associated_files(out_data, self.working_directory) |
---|
582 | # Create generated output children and primary datasets and add to param_dict |
---|
583 | collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)} |
---|
584 | param_dict.update({'__collected_datasets__':collected_datasets}) |
---|
585 | # Certain tools require tasks to be completed after job execution |
---|
586 | # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). |
---|
587 | self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) |
---|
588 | # Call 'exec_after_process' hook |
---|
589 | self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, |
---|
590 | out_data=out_data, param_dict=param_dict, |
---|
591 | tool=self.tool, stdout=stdout, stderr=stderr ) |
---|
592 | job.command_line = self.command_line |
---|
593 | |
---|
594 | # fix permissions |
---|
595 | for path in [ dp.real_path for dp in self.get_output_fnames() ]: |
---|
596 | util.umask_fix_perms( path, self.app.config.umask, 0666, self.app.config.gid ) |
---|
597 | self.sa_session.flush() |
---|
598 | log.debug( 'job %d ended' % self.job_id ) |
---|
599 | self.cleanup() |
---|
600 | |
---|
601 | def cleanup( self ): |
---|
602 | # remove temporary files |
---|
603 | try: |
---|
604 | for fname in self.extra_filenames: |
---|
605 | os.remove( fname ) |
---|
606 | if self.working_directory is not None: |
---|
607 | shutil.rmtree( self.working_directory ) |
---|
608 | if self.app.config.set_metadata_externally: |
---|
609 | self.external_output_metadata.cleanup_external_metadata( self.sa_session ) |
---|
610 | galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session ) |
---|
611 | except: |
---|
612 | log.exception( "Unable to cleanup job %d" % self.job_id ) |
---|
613 | |
---|
614 | def get_command_line( self ): |
---|
615 | return self.command_line |
---|
616 | |
---|
617 | def get_session_id( self ): |
---|
618 | return self.session_id |
---|
619 | |
---|
620 | def get_input_fnames( self ): |
---|
621 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
622 | filenames = [] |
---|
623 | for da in job.input_datasets: #da is JobToInputDatasetAssociation object |
---|
624 | if da.dataset: |
---|
625 | filenames.append( da.dataset.file_name ) |
---|
626 | #we will need to stage in metadata file names also |
---|
627 | #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.) |
---|
628 | for key, value in da.dataset.metadata.items(): |
---|
629 | if isinstance( value, model.MetadataFile ): |
---|
630 | filenames.append( value.file_name ) |
---|
631 | return filenames |
---|
632 | |
---|
633 | def get_output_fnames( self ): |
---|
634 | if self.output_paths is not None: |
---|
635 | return self.output_paths |
---|
636 | |
---|
637 | class DatasetPath( object ): |
---|
638 | def __init__( self, dataset_id, real_path, false_path = None ): |
---|
639 | self.dataset_id = dataset_id |
---|
640 | self.real_path = real_path |
---|
641 | self.false_path = false_path |
---|
642 | def __str__( self ): |
---|
643 | if self.false_path is None: |
---|
644 | return self.real_path |
---|
645 | else: |
---|
646 | return self.false_path |
---|
647 | |
---|
648 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
649 | # Job output datasets are combination of output datasets, library datasets, and jeha datasets. |
---|
650 | jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first() |
---|
651 | if self.app.config.outputs_to_working_directory: |
---|
652 | self.output_paths = [] |
---|
653 | for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets + job.output_library_datasets ]: |
---|
654 | false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) ) |
---|
655 | self.output_paths.append( DatasetPath( data.id, data.file_name, false_path ) ) |
---|
656 | if jeha: |
---|
657 | false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % jeha.dataset.id ) ) |
---|
658 | self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name, false_path ) ) |
---|
659 | else: |
---|
660 | self.output_paths = [ DatasetPath( da.dataset.dataset.id, da.dataset.file_name ) for da in job.output_datasets + job.output_library_datasets ] |
---|
661 | if jeha: |
---|
662 | self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name ) ) |
---|
663 | |
---|
664 | return self.output_paths |
---|
665 | |
---|
666 | def get_output_file_id( self, file ): |
---|
667 | if self.output_paths is None: |
---|
668 | self.get_output_fnames() |
---|
669 | for dp in self.output_paths: |
---|
670 | if self.app.config.outputs_to_working_directory and os.path.basename( dp.false_path ) == file: |
---|
671 | return dp.dataset_id |
---|
672 | elif os.path.basename( dp.real_path ) == file: |
---|
673 | return dp.dataset_id |
---|
674 | return None |
---|
675 | |
---|
676 | def get_tool_provided_job_metadata( self ): |
---|
677 | if self.tool_provided_job_metadata is not None: |
---|
678 | return self.tool_provided_job_metadata |
---|
679 | |
---|
680 | # Look for JSONified job metadata |
---|
681 | self.tool_provided_job_metadata = [] |
---|
682 | meta_file = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ) |
---|
683 | if os.path.exists( meta_file ): |
---|
684 | for line in open( meta_file, 'r' ): |
---|
685 | try: |
---|
686 | line = from_json_string( line ) |
---|
687 | assert 'type' in line |
---|
688 | except: |
---|
689 | log.exception( '(%s) Got JSON data from tool, but data is improperly formatted or no "type" key in data' % self.job_id ) |
---|
690 | log.debug( 'Offending data was: %s' % line ) |
---|
691 | continue |
---|
692 | # Set the dataset id if it's a dataset entry and isn't set. |
---|
693 | # This isn't insecure. We loop the job's output datasets in |
---|
694 | # the finish method, so if a tool writes out metadata for a |
---|
695 | # dataset id that it doesn't own, it'll just be ignored. |
---|
696 | if line['type'] == 'dataset' and 'dataset_id' not in line: |
---|
697 | try: |
---|
698 | line['dataset_id'] = self.get_output_file_id( line['dataset'] ) |
---|
699 | except KeyError: |
---|
700 | log.warning( '(%s) Tool provided job dataset-specific metadata without specifying a dataset' % self.job_id ) |
---|
701 | continue |
---|
702 | self.tool_provided_job_metadata.append( line ) |
---|
703 | return self.tool_provided_job_metadata |
---|
704 | |
---|
705 | def get_dataset_finish_context( self, job_context, dataset ): |
---|
706 | for meta in self.get_tool_provided_job_metadata(): |
---|
707 | if meta['type'] == 'dataset' and meta['dataset_id'] == dataset.id: |
---|
708 | return ExpressionContext( meta, job_context ) |
---|
709 | return job_context |
---|
710 | |
---|
711 | def check_output_sizes( self ): |
---|
712 | sizes = [] |
---|
713 | output_paths = self.get_output_fnames() |
---|
714 | for outfile in [ str( o ) for o in output_paths ]: |
---|
715 | sizes.append( ( outfile, os.stat( outfile ).st_size ) ) |
---|
716 | return sizes |
---|
717 | |
---|
718 | def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ): |
---|
719 | # extension could still be 'auto' if this is the upload tool. |
---|
720 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
721 | if set_extension: |
---|
722 | for output_dataset_assoc in job.output_datasets: |
---|
723 | if output_dataset_assoc.dataset.ext == 'auto': |
---|
724 | context = self.get_dataset_finish_context( dict(), output_dataset_assoc.dataset.dataset ) |
---|
725 | output_dataset_assoc.dataset.extension = context.get( 'ext', 'data' ) |
---|
726 | self.sa_session.flush() |
---|
727 | if tmp_dir is None: |
---|
728 | #this dir should should relative to the exec_dir |
---|
729 | tmp_dir = self.app.config.new_file_path |
---|
730 | if dataset_files_path is None: |
---|
731 | dataset_files_path = self.app.model.Dataset.file_path |
---|
732 | if config_root is None: |
---|
733 | config_root = self.app.config.root |
---|
734 | if datatypes_config is None: |
---|
735 | datatypes_config = self.app.config.datatypes_config |
---|
736 | return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ], |
---|
737 | self.sa_session, |
---|
738 | exec_dir = exec_dir, |
---|
739 | tmp_dir = tmp_dir, |
---|
740 | dataset_files_path = dataset_files_path, |
---|
741 | config_root = config_root, |
---|
742 | datatypes_config = datatypes_config, |
---|
743 | job_metadata = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ), |
---|
744 | **kwds ) |
---|
745 | |
---|
746 | @property |
---|
747 | def user( self ): |
---|
748 | job = self.sa_session.query( model.Job ).get( self.job_id ) |
---|
749 | if job.user is not None: |
---|
750 | return job.user.email |
---|
751 | elif job.galaxy_session is not None and job.galaxy_session.user is not None: |
---|
752 | return job.galaxy_session.user.email |
---|
753 | elif job.history is not None and job.history.user is not None: |
---|
754 | return job.history.user.email |
---|
755 | elif job.galaxy_session is not None: |
---|
756 | return 'anonymous@' + job.galaxy_session.remote_addr.split()[-1] |
---|
757 | else: |
---|
758 | return 'anonymous@unknown' |
---|
759 | |
---|
760 | class DefaultJobDispatcher( object ): |
---|
761 | def __init__( self, app ): |
---|
762 | self.app = app |
---|
763 | self.job_runners = {} |
---|
764 | start_job_runners = ["local"] |
---|
765 | if app.config.start_job_runners is not None: |
---|
766 | start_job_runners.extend( app.config.start_job_runners.split(",") ) |
---|
767 | for runner_name in start_job_runners: |
---|
768 | if runner_name == "local": |
---|
769 | import runners.local |
---|
770 | self.job_runners[runner_name] = runners.local.LocalJobRunner( app ) |
---|
771 | elif runner_name == "pbs": |
---|
772 | import runners.pbs |
---|
773 | self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app ) |
---|
774 | elif runner_name == "sge": |
---|
775 | import runners.sge |
---|
776 | self.job_runners[runner_name] = runners.sge.SGEJobRunner( app ) |
---|
777 | elif runner_name == "drmaa": |
---|
778 | import runners.drmaa |
---|
779 | self.job_runners[runner_name] = runners.drmaa.DRMAAJobRunner( app ) |
---|
780 | else: |
---|
781 | log.error( "Unable to start unknown job runner: %s" %runner_name ) |
---|
782 | |
---|
783 | def put( self, job_wrapper ): |
---|
784 | runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] |
---|
785 | log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) |
---|
786 | self.job_runners[runner_name].put( job_wrapper ) |
---|
787 | |
---|
788 | def stop( self, job ): |
---|
789 | runner_name = ( job.job_runner_name.split(":", 1) )[0] |
---|
790 | log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) |
---|
791 | self.job_runners[runner_name].stop_job( job ) |
---|
792 | |
---|
793 | def recover( self, job, job_wrapper ): |
---|
794 | runner_name = ( job.job_runner_name.split(":", 1) )[0] |
---|
795 | log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) |
---|
796 | self.job_runners[runner_name].recover( job, job_wrapper ) |
---|
797 | |
---|
798 | def shutdown( self ): |
---|
799 | for runner in self.job_runners.itervalues(): |
---|
800 | runner.shutdown() |
---|
801 | |
---|
802 | class JobStopQueue( object ): |
---|
803 | """ |
---|
804 | A queue for jobs which need to be terminated prematurely. |
---|
805 | """ |
---|
806 | STOP_SIGNAL = object() |
---|
807 | def __init__( self, app, dispatcher ): |
---|
808 | self.app = app |
---|
809 | self.sa_session = app.model.context |
---|
810 | self.dispatcher = dispatcher |
---|
811 | |
---|
812 | # Keep track of the pid that started the job manager, only it |
---|
813 | # has valid threads |
---|
814 | self.parent_pid = os.getpid() |
---|
815 | # Contains new jobs. Note this is not used if track_jobs_in_database is True |
---|
816 | self.queue = Queue() |
---|
817 | |
---|
818 | # Contains jobs that are waiting (only use from monitor thread) |
---|
819 | self.waiting = [] |
---|
820 | |
---|
821 | # Helper for interruptable sleep |
---|
822 | self.sleeper = Sleeper() |
---|
823 | self.running = True |
---|
824 | self.monitor_thread = threading.Thread( target=self.monitor ) |
---|
825 | self.monitor_thread.start() |
---|
826 | log.info( "job stopper started" ) |
---|
827 | |
---|
828 | def monitor( self ): |
---|
829 | """ |
---|
830 | Continually iterate the waiting jobs, stop any that are found. |
---|
831 | """ |
---|
832 | # HACK: Delay until after forking, we need a way to do post fork notification!!! |
---|
833 | time.sleep( 10 ) |
---|
834 | while self.running: |
---|
835 | try: |
---|
836 | self.monitor_step() |
---|
837 | except: |
---|
838 | log.exception( "Exception in monitor_step" ) |
---|
839 | # Sleep |
---|
840 | self.sleeper.sleep( 1 ) |
---|
841 | |
---|
842 | def monitor_step( self ): |
---|
843 | """ |
---|
844 | Called repeatedly by `monitor` to stop jobs. |
---|
845 | """ |
---|
846 | # Pull all new jobs from the queue at once |
---|
847 | jobs = [] |
---|
848 | try: |
---|
849 | while 1: |
---|
850 | ( job_id, error_msg ) = self.queue.get_nowait() |
---|
851 | if job_id is self.STOP_SIGNAL: |
---|
852 | return |
---|
853 | # Append to watch queue |
---|
854 | jobs.append( ( job_id, error_msg ) ) |
---|
855 | except Empty: |
---|
856 | pass |
---|
857 | |
---|
858 | for job_id, error_msg in jobs: |
---|
859 | job = self.sa_session.query( model.Job ).get( job_id ) |
---|
860 | self.sa_session.refresh( job ) |
---|
861 | # if desired, error the job so we can inform the user. |
---|
862 | if error_msg is not None: |
---|
863 | job.state = job.states.ERROR |
---|
864 | job.info = error_msg |
---|
865 | else: |
---|
866 | job.state = job.states.DELETED |
---|
867 | self.sa_session.add( job ) |
---|
868 | self.sa_session.flush() |
---|
869 | # if job is in JobQueue or FooJobRunner's put method, |
---|
870 | # job_runner_name will be unset and the job will be dequeued due to |
---|
871 | # state change above |
---|
872 | if job.job_runner_name is not None: |
---|
873 | # tell the dispatcher to stop the job |
---|
874 | self.dispatcher.stop( job ) |
---|
875 | |
---|
876 | def put( self, job_id, error_msg=None ): |
---|
877 | self.queue.put( ( job_id, error_msg ) ) |
---|
878 | |
---|
879 | def shutdown( self ): |
---|
880 | """Attempts to gracefully shut down the worker thread""" |
---|
881 | if self.parent_pid != os.getpid(): |
---|
882 | # We're not the real job queue, do nothing |
---|
883 | return |
---|
884 | else: |
---|
885 | log.info( "sending stop signal to worker thread" ) |
---|
886 | self.running = False |
---|
887 | self.queue.put( ( self.STOP_SIGNAL, None ) ) |
---|
888 | self.sleeper.wake() |
---|
889 | log.info( "job stopper stopped" ) |
---|
890 | |
---|
891 | class NoopQueue( object ): |
---|
892 | """ |
---|
893 | Implements the JobQueue / JobStopQueue interface but does nothing |
---|
894 | """ |
---|
895 | def put( self, *args ): |
---|
896 | return |
---|
897 | def shutdown( self ): |
---|
898 | return |
---|
899 | |
---|