root/galaxy-central/lib/galaxy/jobs/runners/pbs.py @ 2

リビジョン 2, 26.1 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import os, logging, threading, time
2from datetime import timedelta
3from Queue import Queue, Empty
4
5from galaxy import model
6from galaxy.datatypes.data import nice_size
7from galaxy.util.bunch import Bunch
8
9from paste.deploy.converters import asbool
10
11import pkg_resources
12
13try:
14    pkg_resources.require( "pbs_python" )
15    pbs = __import__( "pbs" )
16except:
17    pbs = None
18
19log = logging.getLogger( __name__ )
20
21pbs_template = """#!/bin/sh
22GALAXY_LIB="%s"
23if [ "$GALAXY_LIB" != "None" ]; then
24    if [ -n "$PYTHONPATH" ]; then
25        export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH"
26    else
27        export PYTHONPATH="$GALAXY_LIB"
28    fi
29fi
30cd %s
31%s
32"""
33
34pbs_symlink_template = """#!/bin/sh
35GALAXY_LIB="%s"
36if [ "$GALAXY_LIB" != "None" ]; then
37    if [ -n "$PYTHONPATH" ]; then
38        export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH"
39    else
40        export PYTHONPATH="$GALAXY_LIB"
41    fi
42fi
43for dataset in %s; do
44    dir=`dirname $dataset`
45    file=`basename $dataset`
46    [ ! -d $dir ] && mkdir -p $dir
47    [ ! -e $dataset ] && ln -s %s/$file $dataset
48done
49cd %s
50%s
51"""
52
53# From pbs' job.h
54JOB_EXIT_STATUS = {
55    0:  "job exec successful",
56    -1: "job exec failed, before files, no retry",
57    -2: "job exec failed, after files, no retry",
58    -3: "job execution failed, do retry",
59    -4: "job aborted on MOM initialization",
60    -5: "job aborted on MOM init, chkpt, no migrate",
61    -6: "job aborted on MOM init, chkpt, ok migrate",
62    -7: "job restart failed",
63    -8: "exec() of user command failed",
64}
65
66class PBSJobState( object ):
67    def __init__( self ):
68        """
69        Encapsulates state related to a job that is being run via PBS and
70        that we need to monitor.
71        """
72        self.job_wrapper = None
73        self.job_id = None
74        self.old_state = None
75        self.running = False
76        self.job_file = None
77        self.ofile = None
78        self.efile = None
79        self.runner_url = None
80        self.check_count = 0
81        self.stop_job = False
82
83class PBSJobRunner( object ):
84    """
85    Job runner backed by a finite pool of worker threads. FIFO scheduling
86    """
87    STOP_SIGNAL = object()
88    def __init__( self, app ):
89        """Initialize this job runner and start the monitor thread"""
90        # Check if PBS was importable, fail if not
91        if pbs is None:
92            raise Exception( "PBSJobRunner requires pbs-python which was not found" )
93        if app.config.pbs_application_server and app.config.outputs_to_working_directory:
94            raise Exception( "pbs_application_server (file staging) and outputs_to_working_directory options are mutually exclusive" )
95        self.app = app
96        self.sa_session = app.model.context
97        # 'watched' and 'queue' are both used to keep track of jobs to watch.
98        # 'queue' is used to add new watched jobs, and can be called from
99        # any thread (usually by the 'queue_job' method). 'watched' must only
100        # be modified by the monitor thread, which will move items from 'queue'
101        # to 'watched' and then manage the watched jobs.
102        self.watched = []
103        self.monitor_queue = Queue()
104        # set the default server during startup
105        self.default_pbs_server = None
106        self.determine_pbs_server( 'pbs:///' )
107        self.job_walltime = None
108        if self.app.config.job_walltime is not None:
109            h, m, s = [ int( v ) for v in self.app.config.job_walltime.split( ':' ) ]
110            self.job_walltime = timedelta( 0, s, 0, 0, m, h )
111        self.monitor_thread = threading.Thread( target=self.monitor )
112        self.monitor_thread.start()
113        self.work_queue = Queue()
114        self.work_threads = []
115        nworkers = app.config.cluster_job_queue_workers
116        for i in range( nworkers ):
117            worker = threading.Thread( target=self.run_next )
118            worker.start()
119            self.work_threads.append( worker )
120        log.debug( "%d workers ready" % nworkers )
121
122    def determine_pbs_server( self, url, rewrite = False ):
123        """Determine what PBS server we are connecting to"""
124        url_split = url.split("/")
125        server = url_split[2]
126        if server == "":
127            if not self.default_pbs_server:
128                self.default_pbs_server = pbs.pbs_default()
129                log.debug( "Set default PBS server to %s" % self.default_pbs_server )
130            server = self.default_pbs_server
131            url_split[2] = server
132        if server is None:
133            raise Exception( "Could not find torque server" )
134        if rewrite:
135            return ( server, "/".join( url_split ) )
136        else:
137            return server
138
139    def determine_pbs_queue( self, url ):
140        """Determine what PBS queue we are submitting to"""
141        try:
142            return url.split('/')[3] or None
143        except:
144            return None
145
146    def determine_pbs_options( self, url ):
147        try:
148            opts = url.split('/')[4].strip().lstrip('-').split(' -')
149            assert opts != ['']
150        except:
151            return []
152        rval = []
153        for opt in opts:
154            name, value = opt.split( None, 1 )
155            if name == 'l':
156                resource_attrs = value.split(',')
157                for j, ( res, val ) in enumerate( [ a.split('=', 1) for a in resource_attrs ] ):
158                    rval.append( dict( name = pbs.ATTR_l, value = val, resource = res ) )
159            else:
160                rval.append( dict( name = getattr( pbs, 'ATTR_' + name ), value = value ) )
161        return rval
162
163    def run_next( self ):
164        """
165        Run the next item in the queue (a job waiting to run or finish )
166        """
167        while 1:
168            ( op, obj ) = self.work_queue.get()
169            if op is self.STOP_SIGNAL:
170                return
171            try:
172                if op == 'queue':
173                    self.queue_job( obj )
174                elif op == 'finish':
175                    self.finish_job( obj )
176                elif op == 'fail':
177                    self.fail_job( obj )
178            except:
179                log.exception( "Uncaught exception %sing job" % op )
180
181    def queue_job( self, job_wrapper ):
182        """Create PBS script for a job and submit it to the PBS queue"""
183
184        try:
185            job_wrapper.prepare()
186            command_line = job_wrapper.get_command_line()
187        except:
188            job_wrapper.fail( "failure preparing job", exception=True )
189            log.exception("failure running job %d" % job_wrapper.job_id)
190            return
191
192        runner_url = job_wrapper.tool.job_runner
193       
194        # This is silly, why would we queue a job with no command line?
195        if not command_line:
196            job_wrapper.finish( '', '' )
197            return
198       
199        # Check for deletion before we change state
200        if job_wrapper.get_state() == model.Job.states.DELETED:
201            log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id )
202            job_wrapper.cleanup()
203            return
204
205        ( pbs_server_name, runner_url ) = self.determine_pbs_server( runner_url, rewrite = True )
206        pbs_queue_name = self.determine_pbs_queue( runner_url )
207        pbs_options = self.determine_pbs_options( runner_url )
208        c = pbs.pbs_connect( pbs_server_name )
209        if c <= 0:
210            errno, text = pbs.error()
211            job_wrapper.fail( "Unable to queue job for execution.  Resubmitting the job may succeed." )
212            log.error( "Connection to PBS server for submit failed: %s: %s" % ( errno, text ) )
213            return
214
215        # define job attributes
216        ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id)
217        efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id)
218
219        output_fnames = job_wrapper.get_output_fnames()
220       
221        # If an application server is set, we're staging
222        if self.app.config.pbs_application_server:
223            pbs_ofile = self.app.config.pbs_application_server + ':' + ofile
224            pbs_efile = self.app.config.pbs_application_server + ':' + efile
225            output_files = [ str( o ) for o in output_fnames ]
226            stagein = self.get_stage_in_out( job_wrapper.get_input_fnames() + output_files, symlink=True )
227            stageout = self.get_stage_in_out( output_files )
228            attrs = [
229                dict( name = pbs.ATTR_o, value = pbs_ofile ),
230                dict( name = pbs.ATTR_e, value = pbs_efile ),
231                dict( name = pbs.ATTR_stagein, value = stagein ),
232                dict( name = pbs.ATTR_stageout, value = stageout ),
233            ]
234        # If not, we're using NFS
235        else:
236            attrs = [
237                dict( name = pbs.ATTR_o, value = ofile ),
238                dict( name = pbs.ATTR_e, value = efile ),
239            ]
240
241        # define PBS job options
242        attrs.append( dict( name = pbs.ATTR_N, value = str( "%s_%s_%s" % ( job_wrapper.job_id, job_wrapper.tool.id, job_wrapper.user ) ) ) )
243        job_attrs = pbs.new_attropl( len( attrs ) + len( pbs_options ) )
244        for i, attr in enumerate( attrs + pbs_options ):
245            job_attrs[i].name = attr['name']
246            job_attrs[i].value = attr['value']
247            if 'resource' in attr:
248                job_attrs[i].resource = attr['resource']
249        exec_dir = os.path.abspath( job_wrapper.working_directory )
250
251        # write the job script
252        if self.app.config.pbs_stage_path != '':
253            script = pbs_symlink_template % (job_wrapper.galaxy_lib_dir, " ".join(job_wrapper.get_input_fnames() + output_files), self.app.config.pbs_stage_path, exec_dir, command_line)
254        else:
255            script = pbs_template % ( job_wrapper.galaxy_lib_dir, exec_dir, command_line )
256            if self.app.config.set_metadata_externally:
257                script += "cd %s\n" % os.path.abspath( os.getcwd() )
258                script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ),
259                                                                        tmp_dir = self.app.config.new_file_path,
260                                                                        dataset_files_path = self.app.model.Dataset.file_path,
261                                                                        output_fnames = output_fnames,
262                                                                        set_extension = False,
263                                                                        kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior
264        job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id)
265        fh = file(job_file, "w")
266        fh.write(script)
267        fh.close()
268
269        # job was deleted while we were preparing it
270        if job_wrapper.get_state() == model.Job.states.DELETED:
271            log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id )
272            pbs.pbs_disconnect(c)
273            self.cleanup( ( ofile, efile, job_file ) )
274            job_wrapper.cleanup()
275            return
276
277        # submit
278        galaxy_job_id = job_wrapper.job_id
279        log.debug("(%s) submitting file %s" % ( galaxy_job_id, job_file ) )
280        log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) )
281        job_id = pbs.pbs_submit(c, job_attrs, job_file, pbs_queue_name, None)
282        pbs.pbs_disconnect(c)
283
284        # check to see if it submitted
285        if not job_id:
286            errno, text = pbs.error()
287            log.debug( "(%s) pbs_submit failed, PBS error %d: %s" % (galaxy_job_id, errno, text) )
288            job_wrapper.fail( "Unable to run this job due to a cluster error" )
289            return
290
291        if pbs_queue_name is None:
292            log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) )
293        else:
294            log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, pbs_queue_name, job_id) )
295
296        # store runner information for tracking if Galaxy restarts
297        job_wrapper.set_runner( runner_url, job_id )
298
299        # Store PBS related state information for job
300        pbs_job_state = PBSJobState()
301        pbs_job_state.job_wrapper = job_wrapper
302        pbs_job_state.job_id = job_id
303        pbs_job_state.ofile = ofile
304        pbs_job_state.efile = efile
305        pbs_job_state.job_file = job_file
306        pbs_job_state.old_state = 'N'
307        pbs_job_state.running = False
308        pbs_job_state.runner_url = runner_url
309       
310        # Add to our 'queue' of jobs to monitor
311        self.monitor_queue.put( pbs_job_state )
312
313    def monitor( self ):
314        """
315        Watches jobs currently in the PBS queue and deals with state changes
316        (queued to running) and job completion
317        """
318        while 1:
319            # Take any new watched jobs and put them on the monitor list
320            try:
321                while 1:
322                    pbs_job_state = self.monitor_queue.get_nowait()
323                    if pbs_job_state is self.STOP_SIGNAL:
324                        # TODO: This is where any cleanup would occur
325                        return
326                    self.watched.append( pbs_job_state )
327            except Empty:
328                pass
329            # Iterate over the list of watched jobs and check state
330            try:
331                self.check_watched_items()
332            except:
333                log.exception( "Uncaught exception checking jobs" )
334            # Sleep a bit before the next state check
335            time.sleep( 1 )
336           
337    def check_watched_items( self ):
338        """
339        Called by the monitor thread to look at each watched job and deal
340        with state changes.
341        """
342        new_watched = []
343        # reduce pbs load by batching status queries
344        ( failures, statuses ) = self.check_all_jobs()
345        for pbs_job_state in self.watched:
346            job_id = pbs_job_state.job_id
347            galaxy_job_id = pbs_job_state.job_wrapper.job_id
348            old_state = pbs_job_state.old_state
349            pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url )
350            if pbs_server_name in failures:
351                log.debug( "(%s/%s) Skipping state check because PBS server connection failed" % ( galaxy_job_id, job_id ) )
352                new_watched.append( pbs_job_state )
353                continue
354            try:
355                status = statuses[job_id]
356            except KeyError:
357                try:
358                    # Recheck to make sure it wasn't a communication problem
359                    self.check_single_job( pbs_server_name, job_id )
360                    log.warning( "(%s/%s) PBS job was not in state check list, but was found with individual state check" % ( galaxy_job_id, job_id ) )
361                    new_watched.append( pbs_job_state )
362                except:
363                    errno, text = pbs.error()
364                    if errno == 15001:
365                        # 15001 == job not in queue
366                        log.debug("(%s/%s) PBS job has left queue" % (galaxy_job_id, job_id) )
367                        self.work_queue.put( ( 'finish', pbs_job_state ) )
368                    else:
369                        # Unhandled error, continue to monitor
370                        log.info("(%s/%s) PBS state check resulted in error (%d): %s" % (galaxy_job_id, job_id, errno, text) )
371                        new_watched.append( pbs_job_state )
372                continue
373            if status.job_state != old_state:
374                log.debug("(%s/%s) PBS job state changed from %s to %s" % ( galaxy_job_id, job_id, old_state, status.job_state ) )
375            if status.job_state == "R" and not pbs_job_state.running:
376                pbs_job_state.running = True
377                pbs_job_state.job_wrapper.change_state( model.Job.states.RUNNING )
378            if status.job_state == "R" and ( pbs_job_state.check_count % 20 ) == 0:
379                # Every 20th time the job status is checked, do limit checks (if configured)
380                if self.app.config.output_size_limit > 0:
381                    # Check the size of the job outputs
382                    fail = False
383                    for outfile, size in pbs_job_state.job_wrapper.check_output_sizes():
384                        if size > self.app.config.output_size_limit:
385                            pbs_job_state.fail_message = 'Job output grew too large (greater than %s), please try different job parameters or' \
386                                % nice_size( self.app.config.output_size_limit )
387                            log.warning( '(%s/%s) Dequeueing job due to output %s growing larger than %s limit' \
388                                % ( galaxy_job_id, job_id, os.path.basename( outfile ), nice_size( self.app.config.output_size_limit ) ) )
389                            pbs_job_state.stop_job = True
390                            self.work_queue.put( ( 'fail', pbs_job_state ) )
391                            fail = True
392                            break
393                    if fail:
394                        continue
395                if self.job_walltime is not None:
396                    # Check the job's execution time
397                    if status.get( 'resources_used', False ):
398                        # resources_used may not be in the status for new jobs
399                        h, m, s = [ int( i ) for i in status.resources_used.walltime.split( ':' ) ]
400                        time_executing = timedelta( 0, s, 0, 0, m, h )
401                        if time_executing > self.job_walltime:
402                            pbs_job_state.fail_message = 'Job ran longer than maximum allowed execution time (%s), please try different job parameters or' \
403                                % self.app.config.job_walltime
404                            log.warning( '(%s/%s) Dequeueing job since walltime has been reached' \
405                                % ( galaxy_job_id, job_id ) )
406                            pbs_job_state.stop_job = True
407                            self.work_queue.put( ( 'fail', pbs_job_state ) )
408                            continue
409            elif status.job_state == "C":
410                # "keep_completed" is enabled in PBS, so try to check exit status
411                try:
412                    assert int( status.exit_status ) == 0
413                    log.debug("(%s/%s) PBS job has completed successfully" % ( galaxy_job_id, job_id ) )
414                except AssertionError:
415                    pbs_job_state.fail_message = 'Job cannot be completed due to a cluster error.  Please retry or'
416                    log.error( '(%s/%s) PBS job failed: %s' % ( galaxy_job_id, job_id, JOB_EXIT_STATUS.get( int( status.exit_status ), 'Unknown error: %s' % status.exit_status ) ) )
417                    self.work_queue.put( ( 'fail', pbs_job_state ) )
418                    continue
419                except AttributeError:
420                    # No exit_status, can't verify proper completion so we just have to assume success.
421                    log.debug("(%s/%s) PBS job has completed" % ( galaxy_job_id, job_id ) )
422                self.work_queue.put( ( 'finish', pbs_job_state ) )
423                continue
424            pbs_job_state.old_state = status.job_state
425            new_watched.append( pbs_job_state )
426        # Replace the watch list with the updated version
427        self.watched = new_watched
428       
429    def check_all_jobs( self ):
430        """
431        Returns a list of servers that failed to be contacted and a dict
432        of "job_id : status" pairs (where status is a bunchified version
433        of the API's structure.
434        """
435        servers = []
436        failures = []
437        statuses = {}
438        for pbs_job_state in self.watched:
439            pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url )
440            if pbs_server_name not in servers:
441                servers.append( pbs_server_name )
442            pbs_job_state.check_count += 1
443        for pbs_server_name in servers:
444            c = pbs.pbs_connect( pbs_server_name )
445            if c <= 0:
446                log.debug("connection to PBS server %s for state check failed" % pbs_server_name )
447                failures.append( pbs_server_name )
448                continue
449            stat_attrl = pbs.new_attrl(3)
450            stat_attrl[0].name = pbs.ATTR_state
451            stat_attrl[1].name = pbs.ATTR_used
452            stat_attrl[2].name = pbs.ATTR_exitstat
453            jobs = pbs.pbs_statjob( c, None, stat_attrl, None )
454            pbs.pbs_disconnect( c )
455            statuses.update( self.convert_statjob_to_bunches( jobs ) )
456        return( ( failures, statuses ) )
457
458    def convert_statjob_to_bunches( self, statjob_out ):
459        statuses = {}
460        for job in statjob_out:
461            status = {}
462            for attrib in job.attribs:
463                if attrib.resource is None:
464                    status[ attrib.name ] = attrib.value
465                else:
466                    if attrib.name not in status:
467                        status[ attrib.name ] = Bunch()
468                    status[ attrib.name ][ attrib.resource ] = attrib.value
469            statuses[ job.name ] = Bunch( **status )
470        return statuses
471
472    def check_single_job( self, pbs_server_name, job_id ):
473        """
474        Returns the state of a single job, used to make sure a job is
475        really dead.
476        """
477        c = pbs.pbs_connect( pbs_server_name )
478        if c <= 0:
479            log.debug("connection to PBS server %s for state check failed" % pbs_server_name )
480            return None
481        stat_attrl = pbs.new_attrl(1)
482        stat_attrl[0].name = pbs.ATTR_state
483        jobs = pbs.pbs_statjob( c, job_id, stat_attrl, None )
484        pbs.pbs_disconnect( c )
485        return jobs[0].attribs[0].value
486
487    def finish_job( self, pbs_job_state ):
488        """
489        Get the output/error for a finished job, pass to `job_wrapper.finish`
490        and cleanup all the PBS temporary files.
491        """
492        ofile = pbs_job_state.ofile
493        efile = pbs_job_state.efile
494        job_file = pbs_job_state.job_file
495        # collect the output
496        try:
497            ofh = file(ofile, "r")
498            efh = file(efile, "r")
499            stdout = ofh.read()
500            stderr = efh.read()
501        except:
502            stdout = ''
503            stderr = 'Job output not returned by PBS: the output datasets were deleted while the job was running, the job was manually dequeued or there was a cluster error.'
504            log.debug(stderr)
505
506        try:
507            pbs_job_state.job_wrapper.finish( stdout, stderr )
508        except:
509            log.exception("Job wrapper finish method failed")
510            pbs_job_state.job_wrapper.fail("Unable to finish job", exception=True)
511
512        # clean up the pbs files
513        self.cleanup( ( ofile, efile, job_file ) )
514
515    def fail_job( self, pbs_job_state ):
516        """
517        Seperated out so we can use the worker threads for it.
518        """
519        if pbs_job_state.stop_job:
520            self.stop_job( self.sa_session.query( self.app.model.Job ).get( pbs_job_state.job_wrapper.job_id ) )
521        pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message )
522        self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) )
523
524    def cleanup( self, files ):
525        if not asbool( self.app.config.get( 'debug', False ) ):
526            for file in files:
527                if os.access( file, os.R_OK ):
528                    os.unlink( file )
529
530    def put( self, job_wrapper ):
531        """Add a job to the queue (by job identifier)"""
532        # Change to queued state before handing to worker thread so the runner won't pick it up again
533        job_wrapper.change_state( model.Job.states.QUEUED )
534        self.work_queue.put( ( 'queue', job_wrapper ) )
535   
536    def shutdown( self ):
537        """Attempts to gracefully shut down the monitor thread"""
538        log.info( "sending stop signal to worker threads" )
539        self.monitor_queue.put( self.STOP_SIGNAL )
540        for i in range( len( self.work_threads ) ):
541            self.work_queue.put( ( self.STOP_SIGNAL, None ) )
542        log.info( "pbs job runner stopped" )
543
544    def get_stage_in_out( self, fnames, symlink=False ):
545        """Convenience function to create a stagein/stageout list"""
546        stage = ''
547        for fname in fnames:
548            if os.access(fname, os.R_OK):
549                if stage:
550                    stage += ','
551                # pathnames are now absolute
552                if symlink and self.app.config.pbs_stage_path:
553                    stage_name = os.path.join(self.app.config.pbs_stage_path, os.path.split(fname)[1])
554                else:
555                    stage_name = fname
556                stage += "%s@%s:%s" % (stage_name, self.app.config.pbs_dataset_server, fname)
557        return stage
558
559    def stop_job( self, job ):
560        """Attempts to delete a job from the PBS queue"""
561        pbs_server_name = self.determine_pbs_server( str( job.job_runner_name ) )
562        c = pbs.pbs_connect( pbs_server_name )
563        if c <= 0:
564            log.debug("(%s/%s) Connection to PBS server for job delete failed" % ( job.id, job.job_runner_external_id ) )
565            return
566        pbs.pbs_deljob( c, str( job.job_runner_external_id ), 'NULL' )
567        pbs.pbs_disconnect( c )
568        log.debug( "(%s/%s) Removed from PBS queue before job completion" % ( job.id, job.job_runner_external_id ) )
569
570    def recover( self, job, job_wrapper ):
571        """Recovers jobs stuck in the queued/running state when Galaxy started"""
572        pbs_job_state = PBSJobState()
573        pbs_job_state.ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job.id)
574        pbs_job_state.efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job.id)
575        pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id)
576        pbs_job_state.job_id = str( job.job_runner_external_id )
577        pbs_job_state.runner_url = job_wrapper.tool.job_runner
578        job_wrapper.command_line = job.command_line
579        pbs_job_state.job_wrapper = job_wrapper
580        if job.state == model.Job.states.RUNNING:
581            log.debug( "(%s/%s) is still in running state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) )
582            pbs_job_state.old_state = 'R'
583            pbs_job_state.running = True
584            self.monitor_queue.put( pbs_job_state )
585        elif job.state == model.Job.states.QUEUED:
586            log.debug( "(%s/%s) is still in PBS queued state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) )
587            pbs_job_state.old_state = 'Q'
588            pbs_job_state.running = False
589            self.monitor_queue.put( pbs_job_state )
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。