root/galaxy-central/lib/galaxy/jobs/runners/sge.py

リビジョン 2, 16.1 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import os, logging, threading, time
2from Queue import Queue, Empty
3
4from galaxy import model
5from paste.deploy.converters import asbool
6
7import pkg_resources
8
9try:
10    pkg_resources.require( "DRMAA_python" )
11    DRMAA = __import__( "DRMAA" )
12except:
13    DRMAA = None
14
15log = logging.getLogger( __name__ )
16
17if DRMAA is not None:
18    DRMAA_state = {
19        DRMAA.Session.UNDETERMINED: 'process status cannot be determined',
20        DRMAA.Session.QUEUED_ACTIVE: 'job is queued and waiting to be scheduled',
21        DRMAA.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold',
22        DRMAA.Session.USER_ON_HOLD: 'job is queued and in user hold',
23        DRMAA.Session.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold',
24        DRMAA.Session.RUNNING: 'job is running',
25        DRMAA.Session.SYSTEM_SUSPENDED: 'job is system suspended',
26        DRMAA.Session.USER_SUSPENDED: 'job is user suspended',
27        DRMAA.Session.DONE: 'job finished normally',
28        DRMAA.Session.FAILED: 'job finished, but failed',
29    }
30
31sge_template = """#!/bin/sh
32#$ -S /bin/sh
33GALAXY_LIB="%s"
34if [ "$GALAXY_LIB" != "None" ]; then
35    if [ -n "$PYTHONPATH" ]; then
36        PYTHONPATH="$GALAXY_LIB:$PYTHONPATH"
37    else
38        PYTHONPATH="$GALAXY_LIB"
39    fi
40    export PYTHONPATH
41fi
42cd %s
43%s
44"""
45
46class SGEJobState( object ):
47    def __init__( self ):
48        """
49        Encapsulates state related to a job that is being run via SGE and
50        that we need to monitor.
51        """
52        self.job_wrapper = None
53        self.job_id = None
54        self.old_state = None
55        self.running = False
56        self.job_file = None
57        self.ofile = None
58        self.efile = None
59        self.runner_url = None
60
61class SGEJobRunner( object ):
62    """
63    Job runner backed by a finite pool of worker threads. FIFO scheduling
64    """
65    STOP_SIGNAL = object()
66    def __init__( self, app ):
67        """Initialize this job runner and start the monitor thread"""
68        # Check if SGE was importable, fail if not
69        if DRMAA is None:
70            raise Exception( "SGEJobRunner requires DRMAA_python which was not found" )
71        self.app = app
72        self.sa_session = app.model.context
73        # 'watched' and 'queue' are both used to keep track of jobs to watch.
74        # 'queue' is used to add new watched jobs, and can be called from
75        # any thread (usually by the 'queue_job' method). 'watched' must only
76        # be modified by the monitor thread, which will move items from 'queue'
77        # to 'watched' and then manage the watched jobs.
78        self.watched = []
79        self.monitor_queue = Queue()
80        self.default_cell = self.determine_sge_cell( self.app.config.default_cluster_job_runner )
81        self.ds = DRMAA.Session()
82        self.ds.init( self.default_cell )
83        self.monitor_thread = threading.Thread( target=self.monitor )
84        self.monitor_thread.start()
85        self.work_queue = Queue()
86        self.work_threads = []
87        nworkers = app.config.cluster_job_queue_workers
88        for i in range( nworkers ):
89            worker = threading.Thread( target=self.run_next )
90            worker.start()
91            self.work_threads.append( worker )
92        log.debug( "%d workers ready" % nworkers )
93
94    def determine_sge_cell( self, url ):
95        """Determine what SGE cell we are using"""
96        url_split = url.split("/")
97        if url_split[0] == 'sge:':
98            return url_split[2]
99        # this could happen if sge is started, but is not the default runner
100        else:
101            return ''
102
103    def determine_sge_queue( self, url ):
104        """Determine what SGE queue we are submitting to"""
105        try:
106            return url.split('/')[3] or None
107        except:
108            return None
109
110    def determine_sge_project( self, url ):
111        """Determine what SGE project we are submitting to"""
112        try:
113            return url.split('/')[4] or None
114        except:
115            return None
116
117    def determine_sge_tool_parameters( self, url ):
118        """Determine what are the tool's specific paramters"""
119        try:
120            return url.split('/')[5] or None
121        except:
122            return None
123
124    def run_next( self ):
125        """
126        Run the next item in the queue (a job waiting to run or finish )
127        """
128        while 1:
129            ( op, obj ) = self.work_queue.get()
130            if op is self.STOP_SIGNAL:
131                return
132            try:
133                if op == 'queue':
134                    self.queue_job( obj )
135                elif op == 'finish':
136                    self.finish_job( obj )
137                elif op == 'fail':
138                    self.fail_job( obj )
139            except:
140                log.exception( "Uncaught exception %sing job" % op )
141
142    def queue_job( self, job_wrapper ):
143        """Create SGE script for a job and submit it to the SGE queue"""
144
145        try:
146            job_wrapper.prepare()
147            command_line = job_wrapper.get_command_line()
148        except:
149            job_wrapper.fail( "failure preparing job", exception=True )
150            log.exception("failure running job %d" % job_wrapper.job_id)
151            return
152
153        runner_url = job_wrapper.tool.job_runner
154       
155        # This is silly, why would we queue a job with no command line?
156        if not command_line:
157            job_wrapper.finish( '', '' )
158            return
159       
160        # Check for deletion before we change state
161        if job_wrapper.get_state() == model.Job.states.DELETED:
162            log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id )
163            job_wrapper.cleanup()
164            return
165
166        # Change to queued state immediately
167        job_wrapper.change_state( model.Job.states.QUEUED )
168       
169        if self.determine_sge_cell( runner_url ) != self.default_cell:
170            # TODO: support multiple cells
171            log.warning( "(%s) Using multiple SGE cells is not supported.  This job will be submitted to the default cell." % job_wrapper.job_id )
172        sge_queue_name = self.determine_sge_queue( runner_url )
173        sge_project_name = self.determine_sge_project( runner_url )
174        sge_extra_params = self.determine_sge_tool_parameters ( runner_url )
175
176        # define job attributes
177        ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id)
178        efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id)
179        jt = self.ds.createJobTemplate()
180        jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id)
181        jt.outputPath = ":%s" % ofile
182        jt.errorPath = ":%s" % efile
183        nativeSpec = []
184        if sge_queue_name is not None:
185            nativeSpec.append( "-q '%s'" % sge_queue_name )
186        if sge_project_name is not None:
187            nativeSpec.append( "-P '%s'" % sge_project_name)
188        if sge_extra_params is not None:
189            nativeSpec.append( sge_extra_params )
190        if len(nativeSpec)>0:
191            jt.nativeSpecification = ' '.join(nativeSpec)
192
193        script = sge_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line)
194        if self.app.config.set_metadata_externally:
195            script += "cd %s\n" % os.path.abspath( os.getcwd() )
196            script += "%s\n" % job_wrapper.setup_external_metadata( exec_dir = os.path.abspath( os.getcwd() ),
197                                                                    tmp_dir = self.app.config.new_file_path,
198                                                                    dataset_files_path = self.app.model.Dataset.file_path,
199                                                                    output_fnames = job_wrapper.get_output_fnames(),
200                                                                    set_extension = False,
201                                                                    kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior
202        fh = file( jt.remoteCommand, "w" )
203        fh.write( script )
204        fh.close()
205        os.chmod( jt.remoteCommand, 0750 )
206
207        # job was deleted while we were preparing it
208        if job_wrapper.get_state() == model.Job.states.DELETED:
209            log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id )
210            self.cleanup( ( ofile, efile, jt.remoteCommand ) )
211            job_wrapper.cleanup()
212            return
213
214        galaxy_job_id = job_wrapper.job_id
215        log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) )
216        log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) )
217        # runJob will raise if there's a submit problem
218        job_id = self.ds.runJob(jt)
219        if sge_queue_name is None:
220            log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) )
221        else:
222            log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, sge_queue_name, job_id) )
223
224        # store runner information for tracking if Galaxy restarts
225        job_wrapper.set_runner( runner_url, job_id )
226
227        # Store SGE related state information for job
228        sge_job_state = SGEJobState()
229        sge_job_state.job_wrapper = job_wrapper
230        sge_job_state.job_id = job_id
231        sge_job_state.ofile = ofile
232        sge_job_state.efile = efile
233        sge_job_state.job_file = jt.remoteCommand
234        sge_job_state.old_state = 'new'
235        sge_job_state.running = False
236        sge_job_state.runner_url = runner_url
237       
238        # delete the job template
239        self.ds.deleteJobTemplate( jt )
240
241        # Add to our 'queue' of jobs to monitor
242        self.monitor_queue.put( sge_job_state )
243
244    def monitor( self ):
245        """
246        Watches jobs currently in the PBS queue and deals with state changes
247        (queued to running) and job completion
248        """
249        while 1:
250            # Take any new watched jobs and put them on the monitor list
251            try:
252                while 1:
253                    sge_job_state = self.monitor_queue.get_nowait()
254                    if sge_job_state is self.STOP_SIGNAL:
255                        # TODO: This is where any cleanup would occur
256                        self.ds.exit()
257                        return
258                    self.watched.append( sge_job_state )
259            except Empty:
260                pass
261            # Iterate over the list of watched jobs and check state
262            self.check_watched_items()
263            # Sleep a bit before the next state check
264            time.sleep( 1 )
265           
266    def check_watched_items( self ):
267        """
268        Called by the monitor thread to look at each watched job and deal
269        with state changes.
270        """
271        new_watched = []
272        for sge_job_state in self.watched:
273            job_id = sge_job_state.job_id
274            galaxy_job_id = sge_job_state.job_wrapper.job_id
275            old_state = sge_job_state.old_state
276            try:
277                state = self.ds.getJobProgramStatus( job_id )
278            except DRMAA.InvalidJobError:
279                # we should only get here if an orphaned job was put into the queue at app startup
280                log.debug("(%s/%s) job left SGE queue" % ( galaxy_job_id, job_id ) )
281                self.work_queue.put( ( 'finish', sge_job_state ) )
282                continue
283            except Exception, e:
284                # so we don't kill the monitor thread
285                log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) )
286                log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) )
287                sge_job_state.fail_message = "Cluster could not complete job"
288                self.work_queue.put( ( 'fail', sge_job_state ) )
289                continue
290            if state != old_state:
291                log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, DRMAA_state[state] ) )
292            if state == DRMAA.Session.RUNNING and not sge_job_state.running:
293                sge_job_state.running = True
294                sge_job_state.job_wrapper.change_state( model.Job.states.RUNNING )
295            if state in ( DRMAA.Session.DONE, DRMAA.Session.FAILED ):
296                self.work_queue.put( ( 'finish', sge_job_state ) )
297                continue
298            sge_job_state.old_state = state
299            new_watched.append( sge_job_state )
300        # Replace the watch list with the updated version
301        self.watched = new_watched
302       
303    def finish_job( self, sge_job_state ):
304        """
305        Get the output/error for a finished job, pass to `job_wrapper.finish`
306        and cleanup all the SGE temporary files.
307        """
308        ofile = sge_job_state.ofile
309        efile = sge_job_state.efile
310        job_file = sge_job_state.job_file
311        # collect the output
312        try:
313            ofh = file(ofile, "r")
314            efh = file(efile, "r")
315            stdout = ofh.read()
316            stderr = efh.read()
317        except:
318            stdout = ''
319            stderr = 'Job output not returned from cluster'
320            log.debug(stderr)
321
322        try:
323            sge_job_state.job_wrapper.finish( stdout, stderr )
324        except:
325            log.exception("Job wrapper finish method failed")
326
327        # clean up the sge files
328        self.cleanup( ( ofile, efile, job_file ) )
329
330    def fail_job( self, sge_job_state ):
331        """
332        Seperated out so we can use the worker threads for it.
333        """
334        self.stop_job( self.sa_session.query( self.app.model.Job ).get( sge_job_state.job_wrapper.job_id ) )
335        sge_job_state.job_wrapper.fail( sge_job_state.fail_message )
336        self.cleanup( ( sge_job_state.ofile, sge_job_state.efile, sge_job_state.job_file ) )
337
338    def cleanup( self, files ):
339        if not asbool( self.app.config.get( 'debug', False ) ):
340            for file in files:
341                if os.access( file, os.R_OK ):
342                    os.unlink( file )
343
344    def put( self, job_wrapper ):
345        """Add a job to the queue (by job identifier)"""
346        # Change to queued state before handing to worker thread so the runner won't pick it up again
347        job_wrapper.change_state( model.Job.states.QUEUED )
348        self.work_queue.put( ( 'queue', job_wrapper ) )
349
350    def shutdown( self ):
351        """Attempts to gracefully shut down the monitor thread"""
352        log.info( "sending stop signal to worker threads" )
353        self.monitor_queue.put( self.STOP_SIGNAL )
354        for i in range( len( self.work_threads ) ):
355            self.work_queue.put( ( self.STOP_SIGNAL, None ) )
356        log.info( "sge job runner stopped" )
357
358    def stop_job( self, job ):
359        """Attempts to delete a job from the SGE queue"""
360        try:
361            self.ds.control( job.job_runner_external_id, DRMAA.Session.TERMINATE )
362            log.debug( "(%s/%s) Removed from SGE queue at user's request" % ( job.id, job.job_runner_external_id ) )
363        except DRMAA.InvalidJobError:
364            log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.id, job.job_runner_external_id ) )
365
366    def recover( self, job, job_wrapper ):
367        """Recovers jobs stuck in the queued/running state when Galaxy started"""
368        sge_job_state = SGEJobState()
369        sge_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id)
370        sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id)
371        sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id)
372        sge_job_state.job_id = str( job.job_runner_external_id )
373        sge_job_state.runner_url = job_wrapper.tool.job_runner
374        job_wrapper.command_line = job.command_line
375        sge_job_state.job_wrapper = job_wrapper
376        if job.state == model.Job.states.RUNNING:
377            log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) )
378            sge_job_state.old_state = DRMAA.Session.RUNNING
379            sge_job_state.running = True
380            self.monitor_queue.put( sge_job_state )
381        elif job.state == model.Job.states.QUEUED:
382            log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.id, job.job_runner_external_id ) )
383            sge_job_state.old_state = DRMAA.Session.QUEUED_ACTIVE
384            sge_job_state.running = False
385            self.monitor_queue.put( sge_job_state )
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。