root/galaxy-central/lib/galaxy/jobs/__init__.py @ 2

リビジョン 2, 44.2 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil
2
3import galaxy
4from galaxy import util, model
5from galaxy.model.orm import lazyload
6from galaxy.datatypes.tabular import *
7from galaxy.datatypes.interval import *
8from galaxy.datatypes import metadata
9from galaxy.util.json import from_json_string
10from galaxy.util.expressions import ExpressionContext
11from galaxy.jobs.actions.post import ActionBox
12
13import pkg_resources
14pkg_resources.require( "PasteDeploy" )
15
16from paste.deploy.converters import asbool
17
18from Queue import Queue, Empty
19
20log = logging.getLogger( __name__ )
21
22# States for running a job. These are NOT the same as data states
23JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted'
24
25# This file, if created in the job's working directory, will be used for
26# setting advanced metadata properties on the job and its associated outputs.
27# This interface is currently experimental, is only used by the upload tool,
28# and should eventually become API'd
29TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json'
30
31class JobManager( object ):
32    """
33    Highest level interface to job management.
34   
35    TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly.
36          This should be decoupled.
37    """
38    def __init__( self, app ):
39        self.app = app
40        if self.app.config.get_bool( "enable_job_running", True ):
41            # The dispatcher launches the underlying job runners
42            self.dispatcher = DefaultJobDispatcher( app )
43            # Queues for starting and stopping jobs
44            self.job_queue = JobQueue( app, self.dispatcher )
45            self.job_stop_queue = JobStopQueue( app, self.dispatcher )
46        else:
47            self.job_queue = self.job_stop_queue = NoopQueue()
48    def shutdown( self ):
49        self.job_queue.shutdown()
50        self.job_stop_queue.shutdown()
51
52class Sleeper( object ):
53    """
54    Provides a 'sleep' method that sleeps for a number of seconds *unless*
55    the notify method is called (from a different thread).
56    """
57    def __init__( self ):
58        self.condition = threading.Condition()
59    def sleep( self, seconds ):
60        self.condition.acquire()
61        self.condition.wait( seconds )
62        self.condition.release()
63    def wake( self ):
64        self.condition.acquire()
65        self.condition.notify()
66        self.condition.release()
67
68class JobQueue( object ):
69    """
70    Job manager, waits for jobs to be runnable and then dispatches to
71    a JobRunner.
72    """
73    STOP_SIGNAL = object()
74    def __init__( self, app, dispatcher ):
75        """Start the job manager"""
76        self.app = app
77        self.sa_session = app.model.context
78        self.job_lock = False
79        # Should we read jobs form the database, or use an in memory queue
80        self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False )
81        # Keep track of the pid that started the job manager, only it
82        # has valid threads
83        self.parent_pid = os.getpid()
84        # Contains new jobs. Note this is not used if track_jobs_in_database is True
85        self.queue = Queue()
86       
87        # Contains jobs that are waiting (only use from monitor thread)
88        ## This and jobs_to_check[] are closest to a "Job Queue"
89        self.waiting_jobs = []
90               
91        # Helper for interruptable sleep
92        self.sleeper = Sleeper()
93        self.running = True
94        self.dispatcher = dispatcher
95        self.monitor_thread = threading.Thread( target=self.__monitor )
96        self.monitor_thread.start()       
97        log.info( "job manager started" )
98        if app.config.get_bool( 'enable_job_recovery', True ):
99            self.__check_jobs_at_startup()
100
101    def __check_jobs_at_startup( self ):
102        """
103        Checks all jobs that are in the 'new', 'queued' or 'running' state in
104        the database and requeues or cleans up as necessary.  Only run as the
105        job manager starts.
106        """
107        model = self.app.model
108        for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ):
109            if job.tool_id not in self.app.toolbox.tools_by_id:
110                log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
111                JobWrapper( job, self ).fail( 'This tool was disabled before the job completed.  Please contact your Galaxy administrator, or' )
112            else:
113                log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id )
114                self.queue.put( ( job.id, job.tool_id ) )
115        for job in self.sa_session.query( model.Job ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ):
116            if job.tool_id not in self.app.toolbox.tools_by_id:
117                log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
118                JobWrapper( job, self ).fail( 'This tool was disabled before the job completed.  Please contact your Galaxy administrator, or' )
119            elif job.job_runner_name is None:
120                log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id )
121                if self.track_jobs_in_database:
122                    job.state = model.Job.states.NEW
123                else:
124                    self.queue.put( ( job.id, job.tool_id ) )
125            else:
126                job_wrapper = JobWrapper( job, self )
127                self.dispatcher.recover( job, job_wrapper )
128        if self.sa_session.dirty:
129            self.sa_session.flush()
130
131    def __monitor( self ):
132        """
133        Continually iterate the waiting jobs, checking is each is ready to
134        run and dispatching if so.
135        """
136        # HACK: Delay until after forking, we need a way to do post fork notification!!!
137        time.sleep( 10 )
138        while self.running:
139            try:
140                self.__monitor_step()
141            except:
142                log.exception( "Exception in monitor_step" )
143            # Sleep
144            self.sleeper.sleep( 1 )
145
146    def __monitor_step( self ):
147        """
148        Called repeatedly by `monitor` to process waiting jobs. Gets any new
149        jobs (either from the database or from its own queue), then iterates
150        over all new and waiting jobs to check the state of the jobs each
151        depends on. If the job has dependencies that have not finished, it
152        it goes to the waiting queue. If the job has dependencies with errors,
153        it is marked as having errors and removed from the queue. Otherwise,
154        the job is dispatched.
155        """
156        # Pull all new jobs from the queue at once
157        jobs_to_check = []
158        if self.track_jobs_in_database:
159            # Clear the session so we get fresh states for job and all datasets
160            self.sa_session.expunge_all()
161            # Fetch all new jobs
162            jobs_to_check = self.sa_session.query( model.Job ) \
163                                .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \
164                                .filter( model.Job.state == model.Job.states.NEW ).all()
165        else:
166            try:
167                while 1:
168                    message = self.queue.get_nowait()
169                    if message is self.STOP_SIGNAL:
170                        return
171                    # Unpack the message
172                    job_id, tool_id = message
173                    # Get the job object and append to watch queue
174                    jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
175            except Empty:
176                pass
177            # Get job objects and append to watch queue for any which were
178            # previously waiting
179            for job_id in self.waiting_jobs:
180                jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
181        # Iterate over new and waiting jobs and look for any that are
182        # ready to run
183        new_waiting_jobs = []
184        for job in jobs_to_check:
185            try:
186                # Check the job's dependencies, requeue if they're not done                   
187                job_state = self.__check_if_ready_to_run( job )
188                if job_state == JOB_WAIT:
189                    if not self.track_jobs_in_database:
190                        new_waiting_jobs.append( job.id )
191                elif job_state == JOB_INPUT_ERROR:
192                    log.info( "job %d unable to run: one or more inputs in error state" % job.id )
193                elif job_state == JOB_INPUT_DELETED:
194                    log.info( "job %d unable to run: one or more inputs deleted" % job.id )
195                elif job_state == JOB_READY:
196                    if self.job_lock:
197                        log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id )
198                        if not self.track_jobs_in_database:
199                            new_waiting_jobs.append( job.id )
200                    else:
201                        self.dispatcher.put( JobWrapper( job, self ) )
202                        log.info( "job %d dispatched" % job.id )
203                elif job_state == JOB_DELETED:
204                    log.info( "job %d deleted by user while still queued" % job.id )
205                elif job_state == JOB_ADMIN_DELETED:
206                    job.info( "job %d deleted by admin while still queued" % job.id )
207                else:
208                    log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) )
209                    if not self.track_jobs_in_database:
210                        new_waiting_jobs.append( job.id )
211            except Exception, e:
212                log.exception( "failure running job %d" % job.id )
213        # Update the waiting list
214        self.waiting_jobs = new_waiting_jobs
215        # Done with the session
216        self.sa_session.remove()
217       
218    def __check_if_ready_to_run( self, job ):
219        """
220        Check if a job is ready to run by verifying that each of its input
221        datasets is ready (specifically in the OK state). If any input dataset
222        has an error, fail the job and return JOB_INPUT_ERROR. If any input
223        dataset is deleted, fail the job and return JOB_INPUT_DELETED.  If all
224        input datasets are in OK state, return JOB_READY indicating that the
225        job can be dispatched. Otherwise, return JOB_WAIT indicating that input
226        datasets are still being prepared.
227        """
228        if job.state == model.Job.states.DELETED:
229            return JOB_DELETED
230        elif job.state == model.Job.states.ERROR:
231            return JOB_ADMIN_DELETED
232        for dataset_assoc in job.input_datasets:
233            idata = dataset_assoc.dataset
234            if not idata:
235                continue
236            # don't run jobs for which the input dataset was deleted
237            if idata.deleted:
238                JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
239                return JOB_INPUT_DELETED
240            # an error in the input data causes us to bail immediately
241            elif idata.state == idata.states.ERROR:
242                JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) )
243                return JOB_INPUT_ERROR
244            elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
245                # need to requeue
246                return JOB_WAIT
247        return JOB_READY
248           
249    def put( self, job_id, tool ):
250        """Add a job to the queue (by job identifier)"""
251        if not self.track_jobs_in_database:
252            self.queue.put( ( job_id, tool.id ) )
253            self.sleeper.wake()
254   
255    def shutdown( self ):
256        """Attempts to gracefully shut down the worker thread"""
257        if self.parent_pid != os.getpid():
258            # We're not the real job queue, do nothing
259            return
260        else:
261            log.info( "sending stop signal to worker thread" )
262            self.running = False
263            if not self.track_jobs_in_database:
264                self.queue.put( self.STOP_SIGNAL )
265            self.sleeper.wake()
266            log.info( "job queue stopped" )
267            self.dispatcher.shutdown()
268
269class JobWrapper( object ):
270    """
271    Wraps a 'model.Job' with convience methods for running processes and
272    state management.
273    """
274    def __init__( self, job, queue ):
275        self.job_id = job.id
276        self.session_id = job.session_id
277        self.user_id = job.user_id
278        self.tool = queue.app.toolbox.tools_by_id.get( job.tool_id, None )
279        self.queue = queue
280        self.app = queue.app
281        self.sa_session = self.app.model.context
282        self.extra_filenames = []
283        self.command_line = None
284        self.galaxy_lib_dir = None
285        # With job outputs in the working directory, we need the working
286        # directory to be set before prepare is run, or else premature deletion
287        # and job recovery fail.
288        self.working_directory = \
289            os.path.join( self.app.config.job_working_directory, str( self.job_id ) )
290        self.output_paths = None
291        self.tool_provided_job_metadata = None
292        # Wrapper holding the info required to restore and clean up from files used for setting metadata externally
293        self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job )       
294       
295    def get_param_dict( self ):
296        """
297        Restore the dictionary of parameters from the database.
298        """
299        job = self.sa_session.query( model.Job ).get( self.job_id )
300        param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] )
301        param_dict = self.tool.params_from_strings( param_dict, self.app )
302        return param_dict
303       
304    def prepare( self ):
305        """
306        Prepare the job to run by creating the working directory and the
307        config files.
308        """
309        self.sa_session.expunge_all() #this prevents the metadata reverting that has been seen in conjunction with the PBS job runner
310        if not os.path.exists( self.working_directory ):
311            os.mkdir( self.working_directory )
312        # Restore parameters from the database
313        job = self.sa_session.query( model.Job ).get( self.job_id )
314        if job.user is None and job.galaxy_session is None:
315            raise Exception( 'Job %s has no user and no session.' % job.id )
316        incoming = dict( [ ( p.name, p.value ) for p in job.parameters ] )
317        incoming = self.tool.params_from_strings( incoming, self.app )
318        # Do any validation that could not be done at job creation
319        self.tool.handle_unvalidated_param_values( incoming, self.app )
320        # Restore input / output data lists
321        inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
322        out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
323        out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] )
324       
325        # Set up output dataset association for export history jobs. Because job
326        # uses a Dataset rather than an HDA or LDA, it's necessary to set up a
327        # fake dataset association that provides the needed attributes for
328        # preparing a job.
329        class FakeDatasetAssociation ( object ):
330            def __init__( self, dataset=None ):
331                self.dataset = dataset
332                self.file_name = dataset.file_name
333                self.metadata = dict()
334                self.children = []
335        jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first()
336        if jeha:
337            out_data[ "output_file" ] = FakeDatasetAssociation( dataset=jeha.dataset )
338        # These can be passed on the command line if wanted as $userId $userEmail
339        if job.history and job.history.user: # check for anonymous user!
340            userId = '%d' % job.history.user.id
341            userEmail = str(job.history.user.email)
342        else:
343            userId = 'Anonymous'
344            userEmail = 'Anonymous'
345        incoming['userId'] = userId
346        incoming['userEmail'] = userEmail
347        # Build params, done before hook so hook can use
348        param_dict = self.tool.build_param_dict( incoming, inp_data, out_data, self.get_output_fnames(), self.working_directory )
349        # Certain tools require tasks to be completed prior to job execution
350        # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ).
351        self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict )
352        # Run the before queue ("exec_before_job") hook
353        self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data,
354                             out_data=out_data, tool=self.tool, param_dict=incoming)
355        self.sa_session.flush()
356        # Build any required config files
357        config_filenames = self.tool.build_config_files( param_dict, self.working_directory )
358        # FIXME: Build the param file (might return None, DEPRECATED)
359        param_filename = self.tool.build_param_file( param_dict, self.working_directory )
360        # Build the job's command line
361        self.command_line = self.tool.build_command_line( param_dict )
362        # FIXME: for now, tools get Galaxy's lib dir in their path
363        if self.command_line and self.command_line.startswith( 'python' ):
364            self.galaxy_lib_dir = os.path.abspath( "lib" ) # cwd = galaxy root
365        # Shell fragment to inject dependencies
366        if self.app.config.use_tool_dependencies:
367            self.dependency_shell_commands = self.tool.build_dependency_shell_commands()
368        else:
369            self.dependency_shell_commands = None
370        # We need command_line persisted to the db in order for Galaxy to re-queue the job
371        # if the server was stopped and restarted before the job finished
372        job.command_line = self.command_line
373        self.sa_session.add( job )
374        self.sa_session.flush()
375        # Return list of all extra files
376        extra_filenames = config_filenames
377        if param_filename is not None:
378            extra_filenames.append( param_filename )
379        self.param_dict = param_dict
380        self.extra_filenames = extra_filenames
381        return extra_filenames
382
383    def fail( self, message, exception=False ):
384        """
385        Indicate job failure by setting state and message on all output
386        datasets.
387        """
388        job = self.sa_session.query( model.Job ).get( self.job_id )
389        self.sa_session.refresh( job )
390        # if the job was deleted, don't fail it
391        if not job.state == model.Job.states.DELETED:
392            # Check if the failure is due to an exception
393            if exception:
394                # Save the traceback immediately in case we generate another
395                # below
396                job.traceback = traceback.format_exc()
397                # Get the exception and let the tool attempt to generate
398                # a better message
399                etype, evalue, tb =  sys.exc_info()
400                m = self.tool.handle_job_failure_exception( evalue )
401                if m:
402                    message = m
403            if self.app.config.outputs_to_working_directory:
404                for dataset_path in self.get_output_fnames():
405                    try:
406                        shutil.move( dataset_path.false_path, dataset_path.real_path )
407                        log.debug( "fail(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
408                    except ( IOError, OSError ), e:
409                        log.error( "fail(): Missing output file in working directory: %s" % e )
410            for dataset_assoc in job.output_datasets + job.output_library_datasets:
411                dataset = dataset_assoc.dataset
412                self.sa_session.refresh( dataset )
413                dataset.state = dataset.states.ERROR
414                dataset.blurb = 'tool error'
415                dataset.info = message
416                dataset.set_size()
417                if dataset.ext == 'auto':
418                    dataset.extension = 'data'
419                self.sa_session.add( dataset )
420                self.sa_session.flush()
421            job.state = model.Job.states.ERROR
422            job.command_line = self.command_line
423            job.info = message
424            self.sa_session.add( job )
425            self.sa_session.flush()
426        # If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up
427        if self.tool:
428            self.tool.job_failed( self, message, exception )
429        self.cleanup()
430       
431    def change_state( self, state, info = False ):
432        job = self.sa_session.query( model.Job ).get( self.job_id )
433        self.sa_session.refresh( job )
434        for dataset_assoc in job.output_datasets + job.output_library_datasets:
435            dataset = dataset_assoc.dataset
436            self.sa_session.refresh( dataset )
437            dataset.state = state
438            if info:
439                dataset.info = info
440            self.sa_session.add( dataset )
441            self.sa_session.flush()
442        if info:
443            job.info = info
444        job.state = state
445        self.sa_session.add( job )
446        self.sa_session.flush()
447
448    def get_state( self ):
449        job = self.sa_session.query( model.Job ).get( self.job_id )
450        self.sa_session.refresh( job )
451        return job.state
452
453    def set_runner( self, runner_url, external_id ):
454        job = self.sa_session.query( model.Job ).get( self.job_id )
455        self.sa_session.refresh( job )
456        job.job_runner_name = runner_url
457        job.job_runner_external_id = external_id
458        self.sa_session.add( job )
459        self.sa_session.flush()
460       
461    def finish( self, stdout, stderr ):
462        """
463        Called to indicate that the associated command has been run. Updates
464        the output datasets based on stderr and stdout from the command, and
465        the contents of the output files.
466        """
467        # default post job setup
468        self.sa_session.expunge_all()
469        job = self.sa_session.query( model.Job ).get( self.job_id )
470        # if the job was deleted, don't finish it
471        if job.state == job.states.DELETED:
472            self.cleanup()
473            return
474        elif job.state == job.states.ERROR:
475            # Job was deleted by an administrator
476            self.fail( job.info )
477            return
478        if stderr:
479            job.state = "error"
480        else:
481            job.state = 'ok'
482        if self.app.config.outputs_to_working_directory:
483            for dataset_path in self.get_output_fnames():
484                try:
485                    shutil.move( dataset_path.false_path, dataset_path.real_path )
486                    log.debug( "finish(): Moved %s to %s" % ( dataset_path.false_path, dataset_path.real_path ) )
487                except ( IOError, OSError ):
488                    # this can happen if Galaxy is restarted during the job's
489                    # finish method - the false_path file has already moved,
490                    # and when the job is recovered, it won't be found.
491                    if os.path.exists( dataset_path.real_path ) and os.stat( dataset_path.real_path ).st_size > 0:
492                        log.warning( "finish(): %s not found, but %s is not empty, so it will be used instead" % ( dataset_path.false_path, dataset_path.real_path ) )
493                    else:
494                        self.fail( "Job %s's output dataset(s) could not be read" % job.id )
495                        return
496        job_context = ExpressionContext( dict( stdout = stdout, stderr = stderr ) )
497        for dataset_assoc in job.output_datasets + job.output_library_datasets:
498            context = self.get_dataset_finish_context( job_context, dataset_assoc.dataset.dataset )
499            #should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur
500            for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: #need to update all associated output hdas, i.e. history was shared with job running
501                dataset.blurb = 'done'
502                dataset.peek  = 'no peek'
503                dataset.info  = context['stdout'] + context['stderr']
504                dataset.set_size()
505                if context['stderr']:
506                    dataset.blurb = "error"
507                elif dataset.has_data():
508                    # If the tool was expected to set the extension, attempt to retrieve it
509                    if dataset.ext == 'auto':
510                        dataset.extension = context.get( 'ext', 'data' )
511                        dataset.init_meta( copy_from=dataset )
512                    #if a dataset was copied, it won't appear in our dictionary:
513                    #either use the metadata from originating output dataset, or call set_meta on the copies
514                    #it would be quicker to just copy the metadata from the originating output dataset,
515                    #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta()
516                    if not self.app.config.set_metadata_externally or \
517                     ( not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) \
518                       and self.app.config.retry_metadata_internally ):
519                        dataset.set_meta( overwrite = False )
520                    elif not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) and not context['stderr']:
521                        dataset._state = model.Dataset.states.FAILED_METADATA
522                    else:
523                        #load metadata from file
524                        #we need to no longer allow metadata to be edited while the job is still running,
525                        #since if it is edited, the metadata changed on the running output will no longer match
526                        #the metadata that was stored to disk for use via the external process,
527                        #and the changes made by the user will be lost, without warning or notice
528                        dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset, self.sa_session ).filename_out )
529                    try:
530                        assert context.get( 'line_count', None ) is not None
531                        if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte:
532                            dataset.set_peek( line_count=context['line_count'], is_multi_byte=True )
533                        else:
534                            dataset.set_peek( line_count=context['line_count'] )
535                    except:
536                        if ( not dataset.datatype.composite_type and dataset.dataset.is_multi_byte() ) or self.tool.is_multi_byte:
537                            dataset.set_peek( is_multi_byte=True )
538                        else:
539                            dataset.set_peek()
540                    try:
541                        # set the name if provided by the tool
542                        dataset.name = context['name']
543                    except:
544                        pass
545                else:
546                    dataset.blurb = "empty"
547                    if dataset.ext == 'auto':
548                        dataset.extension = 'txt'
549                self.sa_session.add( dataset )
550            if context['stderr']:
551                dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR
552            else:
553                dataset_assoc.dataset.dataset.state = model.Dataset.states.OK
554            # If any of the rest of the finish method below raises an
555            # exception, the fail method will run and set the datasets to
556            # ERROR.  The user will never see that the datasets are in error if
557            # they were flushed as OK here, since upon doing so, the history
558            # panel stops checking for updates.  So allow the
559            # self.sa_session.flush() at the bottom of this method set
560            # the state instead.
561
562        for pja in job.post_job_actions:
563            ActionBox.execute(self.app, self.sa_session, pja.post_job_action, job)
564        # Flush all the dataset and job changes above.  Dataset state changes
565        # will now be seen by the user.
566        self.sa_session.flush()
567        # Save stdout and stderr   
568        if len( stdout ) > 32768:
569            log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id )
570        job.stdout = stdout[:32768]
571        if len( stderr ) > 32768:
572            log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id )
573        job.stderr = stderr[:32768] 
574        # custom post process setup
575        inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
576        out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] )
577        out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] )
578        param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) # why not re-use self.param_dict here? ##dunno...probably should, this causes tools.parameters.basic.UnvalidatedValue to be used in following methods instead of validated and transformed values during i.e. running workflows
579        param_dict = self.tool.params_from_strings( param_dict, self.app )
580        # Check for and move associated_files
581        self.tool.collect_associated_files(out_data, self.working_directory)
582        # Create generated output children and primary datasets and add to param_dict
583        collected_datasets = {'children':self.tool.collect_child_datasets(out_data),'primary':self.tool.collect_primary_datasets(out_data)}
584        param_dict.update({'__collected_datasets__':collected_datasets})
585        # Certain tools require tasks to be completed after job execution
586        # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ).
587        self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job )
588        # Call 'exec_after_process' hook
589        self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data,
590                             out_data=out_data, param_dict=param_dict,
591                             tool=self.tool, stdout=stdout, stderr=stderr )
592        job.command_line = self.command_line
593
594        # fix permissions
595        for path in [ dp.real_path for dp in self.get_output_fnames() ]:
596            util.umask_fix_perms( path, self.app.config.umask, 0666, self.app.config.gid )
597        self.sa_session.flush()
598        log.debug( 'job %d ended' % self.job_id )
599        self.cleanup()
600       
601    def cleanup( self ):
602        # remove temporary files
603        try:
604            for fname in self.extra_filenames:
605                os.remove( fname )
606            if self.working_directory is not None:
607                shutil.rmtree( self.working_directory )
608            if self.app.config.set_metadata_externally:
609                self.external_output_metadata.cleanup_external_metadata( self.sa_session )
610            galaxy.tools.imp_exp.JobExportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session )
611        except:
612            log.exception( "Unable to cleanup job %d" % self.job_id )
613       
614    def get_command_line( self ):
615        return self.command_line
616   
617    def get_session_id( self ):
618        return self.session_id
619
620    def get_input_fnames( self ):
621        job = self.sa_session.query( model.Job ).get( self.job_id )
622        filenames = []
623        for da in job.input_datasets: #da is JobToInputDatasetAssociation object
624            if da.dataset:
625                filenames.append( da.dataset.file_name )
626                #we will need to stage in metadata file names also
627                #TODO: would be better to only stage in metadata files that are actually needed (found in command line, referenced in config files, etc.)
628                for key, value in da.dataset.metadata.items():
629                    if isinstance( value, model.MetadataFile ):
630                        filenames.append( value.file_name )
631        return filenames
632
633    def get_output_fnames( self ):
634        if self.output_paths is not None:
635            return self.output_paths
636
637        class DatasetPath( object ):
638            def __init__( self, dataset_id, real_path, false_path = None ):
639                self.dataset_id = dataset_id
640                self.real_path = real_path
641                self.false_path = false_path
642            def __str__( self ):
643                if self.false_path is None:
644                    return self.real_path
645                else:
646                    return self.false_path
647
648        job = self.sa_session.query( model.Job ).get( self.job_id )
649        # Job output datasets are combination of output datasets, library datasets, and jeha datasets.
650        jeha = self.sa_session.query( model.JobExportHistoryArchive ).filter_by( job=job ).first()
651        if self.app.config.outputs_to_working_directory:
652            self.output_paths = []
653            for name, data in [ ( da.name, da.dataset.dataset ) for da in job.output_datasets + job.output_library_datasets ]:
654                false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % data.id ) )
655                self.output_paths.append( DatasetPath( data.id, data.file_name, false_path ) )
656            if jeha:
657                false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % jeha.dataset.id ) )
658                self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name, false_path ) )
659        else:
660            self.output_paths = [ DatasetPath( da.dataset.dataset.id, da.dataset.file_name ) for da in job.output_datasets + job.output_library_datasets ]
661            if jeha:
662                self.output_paths.append( DatasetPath( jeha.dataset.id, jeha.dataset.file_name ) )
663
664        return self.output_paths
665
666    def get_output_file_id( self, file ):
667        if self.output_paths is None:
668            self.get_output_fnames()
669        for dp in self.output_paths:
670            if self.app.config.outputs_to_working_directory and os.path.basename( dp.false_path ) == file:
671                return dp.dataset_id
672            elif os.path.basename( dp.real_path ) == file:
673                return dp.dataset_id
674        return None
675
676    def get_tool_provided_job_metadata( self ):
677        if self.tool_provided_job_metadata is not None:
678            return self.tool_provided_job_metadata
679
680        # Look for JSONified job metadata
681        self.tool_provided_job_metadata = []
682        meta_file = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE )
683        if os.path.exists( meta_file ):
684            for line in open( meta_file, 'r' ):
685                try:
686                    line = from_json_string( line )
687                    assert 'type' in line
688                except:
689                    log.exception( '(%s) Got JSON data from tool, but data is improperly formatted or no "type" key in data' % self.job_id )
690                    log.debug( 'Offending data was: %s' % line )
691                    continue
692                # Set the dataset id if it's a dataset entry and isn't set.
693                # This isn't insecure.  We loop the job's output datasets in
694                # the finish method, so if a tool writes out metadata for a
695                # dataset id that it doesn't own, it'll just be ignored.
696                if line['type'] == 'dataset' and 'dataset_id' not in line:
697                    try:
698                        line['dataset_id'] = self.get_output_file_id( line['dataset'] )
699                    except KeyError:
700                        log.warning( '(%s) Tool provided job dataset-specific metadata without specifying a dataset' % self.job_id )
701                        continue
702                self.tool_provided_job_metadata.append( line )
703        return self.tool_provided_job_metadata
704
705    def get_dataset_finish_context( self, job_context, dataset ):
706        for meta in self.get_tool_provided_job_metadata():
707            if meta['type'] == 'dataset' and meta['dataset_id'] == dataset.id:
708                return ExpressionContext( meta, job_context )
709        return job_context
710
711    def check_output_sizes( self ):
712        sizes = []
713        output_paths = self.get_output_fnames()
714        for outfile in [ str( o ) for o in output_paths ]:
715            sizes.append( ( outfile, os.stat( outfile ).st_size ) )
716        return sizes
717
718    def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ):
719        # extension could still be 'auto' if this is the upload tool.
720        job = self.sa_session.query( model.Job ).get( self.job_id )
721        if set_extension:
722            for output_dataset_assoc in job.output_datasets:
723                if output_dataset_assoc.dataset.ext == 'auto':
724                    context = self.get_dataset_finish_context( dict(), output_dataset_assoc.dataset.dataset )
725                    output_dataset_assoc.dataset.extension = context.get( 'ext', 'data' )
726            self.sa_session.flush()
727        if tmp_dir is None:
728            #this dir should should relative to the exec_dir
729            tmp_dir = self.app.config.new_file_path
730        if dataset_files_path is None:
731            dataset_files_path = self.app.model.Dataset.file_path
732        if config_root is None:
733            config_root = self.app.config.root
734        if datatypes_config is None:
735            datatypes_config = self.app.config.datatypes_config
736        return self.external_output_metadata.setup_external_metadata( [ output_dataset_assoc.dataset for output_dataset_assoc in job.output_datasets ],
737                                                                      self.sa_session,
738                                                                      exec_dir = exec_dir,
739                                                                      tmp_dir = tmp_dir,
740                                                                      dataset_files_path = dataset_files_path,
741                                                                      config_root = config_root,
742                                                                      datatypes_config = datatypes_config,
743                                                                      job_metadata = os.path.join( self.working_directory, TOOL_PROVIDED_JOB_METADATA_FILE ),
744                                                                      **kwds )
745
746    @property
747    def user( self ):
748        job = self.sa_session.query( model.Job ).get( self.job_id )
749        if job.user is not None:
750            return job.user.email
751        elif job.galaxy_session is not None and job.galaxy_session.user is not None:
752            return job.galaxy_session.user.email
753        elif job.history is not None and job.history.user is not None:
754            return job.history.user.email
755        elif job.galaxy_session is not None:
756            return 'anonymous@' + job.galaxy_session.remote_addr.split()[-1]
757        else:
758            return 'anonymous@unknown'
759
760class DefaultJobDispatcher( object ):
761    def __init__( self, app ):
762        self.app = app
763        self.job_runners = {}
764        start_job_runners = ["local"]
765        if app.config.start_job_runners is not None:
766            start_job_runners.extend( app.config.start_job_runners.split(",") )
767        for runner_name in start_job_runners:
768            if runner_name == "local":
769                import runners.local
770                self.job_runners[runner_name] = runners.local.LocalJobRunner( app )
771            elif runner_name == "pbs":
772                import runners.pbs
773                self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app )
774            elif runner_name == "sge":
775                import runners.sge
776                self.job_runners[runner_name] = runners.sge.SGEJobRunner( app )
777            elif runner_name == "drmaa":
778                import runners.drmaa
779                self.job_runners[runner_name] = runners.drmaa.DRMAAJobRunner( app )
780            else:
781                log.error( "Unable to start unknown job runner: %s" %runner_name )
782           
783    def put( self, job_wrapper ):
784        runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0]
785        log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
786        self.job_runners[runner_name].put( job_wrapper )
787
788    def stop( self, job ):
789        runner_name = ( job.job_runner_name.split(":", 1) )[0]
790        log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) )
791        self.job_runners[runner_name].stop_job( job )
792
793    def recover( self, job, job_wrapper ):
794        runner_name = ( job.job_runner_name.split(":", 1) )[0]
795        log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) )
796        self.job_runners[runner_name].recover( job, job_wrapper )
797
798    def shutdown( self ):
799        for runner in self.job_runners.itervalues():
800            runner.shutdown()
801
802class JobStopQueue( object ):
803    """
804    A queue for jobs which need to be terminated prematurely.
805    """
806    STOP_SIGNAL = object()
807    def __init__( self, app, dispatcher ):
808        self.app = app
809        self.sa_session = app.model.context
810        self.dispatcher = dispatcher
811
812        # Keep track of the pid that started the job manager, only it
813        # has valid threads
814        self.parent_pid = os.getpid()
815        # Contains new jobs. Note this is not used if track_jobs_in_database is True
816        self.queue = Queue()
817
818        # Contains jobs that are waiting (only use from monitor thread)
819        self.waiting = []
820
821        # Helper for interruptable sleep
822        self.sleeper = Sleeper()
823        self.running = True
824        self.monitor_thread = threading.Thread( target=self.monitor )
825        self.monitor_thread.start()       
826        log.info( "job stopper started" )
827
828    def monitor( self ):
829        """
830        Continually iterate the waiting jobs, stop any that are found.
831        """
832        # HACK: Delay until after forking, we need a way to do post fork notification!!!
833        time.sleep( 10 )
834        while self.running:
835            try:
836                self.monitor_step()
837            except:
838                log.exception( "Exception in monitor_step" )
839            # Sleep
840            self.sleeper.sleep( 1 )
841
842    def monitor_step( self ):
843        """
844        Called repeatedly by `monitor` to stop jobs.
845        """
846        # Pull all new jobs from the queue at once
847        jobs = []
848        try:
849            while 1:
850                ( job_id, error_msg ) = self.queue.get_nowait()
851                if job_id is self.STOP_SIGNAL:
852                    return
853                # Append to watch queue
854                jobs.append( ( job_id, error_msg ) )
855        except Empty:
856            pass 
857
858        for job_id, error_msg in jobs:
859            job = self.sa_session.query( model.Job ).get( job_id )
860            self.sa_session.refresh( job )
861            # if desired, error the job so we can inform the user.
862            if error_msg is not None:
863                job.state = job.states.ERROR
864                job.info = error_msg
865            else:
866                job.state = job.states.DELETED
867            self.sa_session.add( job )
868            self.sa_session.flush()
869            # if job is in JobQueue or FooJobRunner's put method,
870            # job_runner_name will be unset and the job will be dequeued due to
871            # state change above
872            if job.job_runner_name is not None:
873                # tell the dispatcher to stop the job
874                self.dispatcher.stop( job )
875
876    def put( self, job_id, error_msg=None ):
877        self.queue.put( ( job_id, error_msg ) )
878
879    def shutdown( self ):
880        """Attempts to gracefully shut down the worker thread"""
881        if self.parent_pid != os.getpid():
882            # We're not the real job queue, do nothing
883            return
884        else:
885            log.info( "sending stop signal to worker thread" )
886            self.running = False
887            self.queue.put( ( self.STOP_SIGNAL, None ) )
888            self.sleeper.wake()
889            log.info( "job stopper stopped" )
890
891class NoopQueue( object ):
892    """
893    Implements the JobQueue / JobStopQueue interface but does nothing
894    """
895    def put( self, *args ):
896        return
897    def shutdown( self ):
898        return
899
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。