[2] | 1 | #!/usr/bin/env python |
---|
| 2 | |
---|
| 3 | """ |
---|
| 4 | Runs Lastz |
---|
| 5 | Written for Lastz v. 1.01.88. |
---|
| 6 | |
---|
| 7 | usage: lastz_wrapper.py [options] |
---|
| 8 | --ref_name: The reference name to change all output matches to |
---|
| 9 | --ref_source: Whether the reference is cached or from the history |
---|
| 10 | --source_select: Whether to used pre-set or cached reference file |
---|
| 11 | --input1: The name of the reference file if using history or reference base name if using cached |
---|
| 12 | --input2: The reads file to align |
---|
| 13 | --ref_sequences: The number of sequences in the reference file if using one from history |
---|
| 14 | --pre_set_options: Which of the pre set options to use, if using pre-sets |
---|
| 15 | --strand: Which strand of the read to search, if specifying all parameters |
---|
| 16 | --seed: Seeding settings, if specifying all parameters |
---|
| 17 | --gfextend: Whether to perform gap-free extension of seed hits to HSPs (high scoring segment pairs), if specifying all parameters |
---|
| 18 | --chain: Whether to perform chaining of HSPs, if specifying all parameters |
---|
| 19 | --transition: Number of transitions to allow in each seed hit, if specifying all parameters |
---|
| 20 | --O: Gap opening penalty, if specifying all parameters |
---|
| 21 | --E: Gap extension penalty, if specifying all parameters |
---|
| 22 | --X: X-drop threshold, if specifying all parameters |
---|
| 23 | --Y: Y-drop threshold, if specifying all parameters |
---|
| 24 | --K: Threshold for HSPs, if specifying all parameters |
---|
| 25 | --L: Threshold for gapped alignments, if specifying all parameters |
---|
| 26 | --entropy: Whether to involve entropy when filtering HSPs, if specifying all parameters |
---|
| 27 | --identity_min: Minimum identity (don't report matches under this identity) |
---|
| 28 | --identity_max: Maximum identity (don't report matches above this identity) |
---|
| 29 | --coverage: The minimum coverage value (don't report matches covering less than this) |
---|
| 30 | --unmask: Whether to convert lowercase bases to uppercase |
---|
| 31 | --out_format: The format of the output file (sam, diffs, or tabular (general)) |
---|
| 32 | --output: The name of the output file |
---|
| 33 | --lastzSeqsFileDir: Directory of local lastz_seqs.loc file |
---|
| 34 | """ |
---|
| 35 | import optparse, os, subprocess, shutil, sys, tempfile, threading, time |
---|
| 36 | from Queue import Queue |
---|
| 37 | |
---|
| 38 | from galaxy import eggs |
---|
| 39 | import pkg_resources |
---|
| 40 | pkg_resources.require( 'bx-python' ) |
---|
| 41 | from bx.seq.twobit import * |
---|
| 42 | from bx.seq.fasta import FastaReader |
---|
| 43 | from galaxy.util.bunch import Bunch |
---|
| 44 | |
---|
| 45 | STOP_SIGNAL = object() |
---|
| 46 | WORKERS = 4 |
---|
| 47 | SLOTS = 128 |
---|
| 48 | |
---|
| 49 | def stop_err( msg ): |
---|
| 50 | sys.stderr.write( "%s" % msg ) |
---|
| 51 | sys.exit() |
---|
| 52 | |
---|
| 53 | def stop_queues( lastz, combine_data ): |
---|
| 54 | # This method should only be called if an error has been encountered. |
---|
| 55 | # Send STOP_SIGNAL to all worker threads |
---|
| 56 | for t in lastz.threads: |
---|
| 57 | lastz.put( STOP_SIGNAL, True ) |
---|
| 58 | combine_data.put( STOP_SIGNAL, True ) |
---|
| 59 | |
---|
| 60 | class BaseQueue( object ): |
---|
| 61 | def __init__( self, num_threads, slots=-1 ): |
---|
| 62 | # Initialize the queue and worker threads |
---|
| 63 | self.queue = Queue( slots ) |
---|
| 64 | self.threads = [] |
---|
| 65 | for i in range( num_threads ): |
---|
| 66 | worker = threading.Thread( target=self.run_next ) |
---|
| 67 | worker.start() |
---|
| 68 | self.threads.append( worker ) |
---|
| 69 | def run_next( self ): |
---|
| 70 | # Run the next job, waiting until one is available if necessary |
---|
| 71 | while True: |
---|
| 72 | job = self.queue.get() |
---|
| 73 | if job is STOP_SIGNAL: |
---|
| 74 | return self.shutdown() |
---|
| 75 | self.run_job( job ) |
---|
| 76 | time.sleep( 1 ) |
---|
| 77 | def run_job( self, job ): |
---|
| 78 | stop_err( 'Not Implemented' ) |
---|
| 79 | def put( self, job, block=False ): |
---|
| 80 | # Add a job to the queue |
---|
| 81 | self.queue.put( job, block ) |
---|
| 82 | def shutdown( self ): |
---|
| 83 | return |
---|
| 84 | |
---|
| 85 | class LastzJobQueue( BaseQueue ): |
---|
| 86 | """ |
---|
| 87 | A queue that runs commands in parallel. Blocking is done so the queue will |
---|
| 88 | not consume much memory. |
---|
| 89 | """ |
---|
| 90 | def run_job( self, job ): |
---|
| 91 | # Execute the job's command |
---|
| 92 | proc = subprocess.Popen( args=job.command, shell=True, stderr=subprocess.PIPE, ) |
---|
| 93 | proc.wait() |
---|
| 94 | stderr = proc.stderr.read() |
---|
| 95 | proc.wait() |
---|
| 96 | if stderr: |
---|
| 97 | stop_queues( self, job.combine_data_queue ) |
---|
| 98 | stop_err( stderr ) |
---|
| 99 | job.combine_data_queue.put( job ) |
---|
| 100 | |
---|
| 101 | class CombineDataQueue( BaseQueue ): |
---|
| 102 | """ |
---|
| 103 | A queue that concatenates files in serial. Blocking is not done since this |
---|
| 104 | queue is not expected to grow larger than the command queue. |
---|
| 105 | """ |
---|
| 106 | def __init__( self, output_filename, num_threads=1 ): |
---|
| 107 | BaseQueue.__init__( self, num_threads ) |
---|
| 108 | self.CHUNK_SIZE = 2**20 # 1Mb |
---|
| 109 | self.output_file = open( output_filename, 'wb' ) |
---|
| 110 | def run_job( self, job ): |
---|
| 111 | in_file = open( job.output, 'rb' ) |
---|
| 112 | while True: |
---|
| 113 | chunk = in_file.read( self.CHUNK_SIZE ) |
---|
| 114 | if not chunk: |
---|
| 115 | in_file.close() |
---|
| 116 | break |
---|
| 117 | self.output_file.write( chunk ) |
---|
| 118 | for file_name in job.cleanup: |
---|
| 119 | os.remove( file_name ) |
---|
| 120 | def shutdown( self ): |
---|
| 121 | self.output_file.close() |
---|
| 122 | return |
---|
| 123 | |
---|
| 124 | def __main__(): |
---|
| 125 | #Parse Command Line |
---|
| 126 | parser = optparse.OptionParser() |
---|
| 127 | parser.add_option( '', '--ref_name', dest='ref_name', help='The reference name to change all output matches to' ) |
---|
| 128 | parser.add_option( '', '--ref_source', dest='ref_source', help='Whether the reference is cached or from the history' ) |
---|
| 129 | parser.add_option( '', '--ref_sequences', dest='ref_sequences', help='Number of sequences in the reference dataset' ) |
---|
| 130 | parser.add_option( '', '--source_select', dest='source_select', help='Whether to used pre-set or cached reference file' ) |
---|
| 131 | parser.add_option( '', '--input1', dest='input1', help='The name of the reference file if using history or reference base name if using cached' ) |
---|
| 132 | parser.add_option( '', '--input2', dest='input2', help='The reads file to align' ) |
---|
| 133 | parser.add_option( '', '--pre_set_options', dest='pre_set_options', help='Which of the pre set options to use, if using pre-sets' ) |
---|
| 134 | parser.add_option( '', '--strand', dest='strand', help='Which strand of the read to search, if specifying all parameters' ) |
---|
| 135 | parser.add_option( '', '--seed', dest='seed', help='Seeding settings, if specifying all parameters' ) |
---|
| 136 | parser.add_option( '', '--transition', dest='transition', help='Number of transitions to allow in each seed hit, if specifying all parameters' ) |
---|
| 137 | parser.add_option( '', '--gfextend', dest='gfextend', help='Whether to perform gap-free extension of seed hits to HSPs (high scoring segment pairs), if specifying all parameters' ) |
---|
| 138 | parser.add_option( '', '--chain', dest='chain', help='Whether to perform chaining of HSPs, if specifying all parameters' ) |
---|
| 139 | parser.add_option( '', '--O', dest='O', help='Gap opening penalty, if specifying all parameters' ) |
---|
| 140 | parser.add_option( '', '--E', dest='E', help='Gap extension penalty, if specifying all parameters' ) |
---|
| 141 | parser.add_option( '', '--X', dest='X', help='X-drop threshold, if specifying all parameters' ) |
---|
| 142 | parser.add_option( '', '--Y', dest='Y', help='Y-drop threshold, if specifying all parameters' ) |
---|
| 143 | parser.add_option( '', '--K', dest='K', help='Threshold for HSPs, if specifying all parameters' ) |
---|
| 144 | parser.add_option( '', '--L', dest='L', help='Threshold for gapped alignments, if specifying all parameters' ) |
---|
| 145 | parser.add_option( '', '--entropy', dest='entropy', help='Whether to involve entropy when filtering HSPs, if specifying all parameters' ) |
---|
| 146 | parser.add_option( '', '--identity_min', dest='identity_min', help="Minimum identity (don't report matches under this identity)" ) |
---|
| 147 | parser.add_option( '', '--identity_max', dest='identity_max', help="Maximum identity (don't report matches above this identity)" ) |
---|
| 148 | parser.add_option( '', '--coverage', dest='coverage', help="The minimum coverage value (don't report matches covering less than this)" ) |
---|
| 149 | parser.add_option( '', '--unmask', dest='unmask', help='Whether to convert lowercase bases to uppercase' ) |
---|
| 150 | parser.add_option( '', '--out_format', dest='format', help='The format of the output file (sam, diffs, or tabular (general))' ) |
---|
| 151 | parser.add_option( '', '--output', dest='output', help='The output file' ) |
---|
| 152 | parser.add_option( '', '--lastzSeqsFileDir', dest='lastzSeqsFileDir', help='Directory of local lastz_seqs.loc file' ) |
---|
| 153 | ( options, args ) = parser.parse_args() |
---|
| 154 | |
---|
| 155 | if options.unmask == 'yes': |
---|
| 156 | unmask = '[unmask]' |
---|
| 157 | else: |
---|
| 158 | unmask = '' |
---|
| 159 | if options.ref_name != 'None': |
---|
| 160 | ref_name = '[nickname=%s]' % options.ref_name |
---|
| 161 | else: |
---|
| 162 | ref_name = '' |
---|
| 163 | # Prepare for commonly-used preset options |
---|
| 164 | if options.source_select == 'pre_set': |
---|
| 165 | set_options = '--%s' % options.pre_set_options |
---|
| 166 | # Prepare for user-specified options |
---|
| 167 | else: |
---|
| 168 | set_options = '--%s --%s --gapped --strand=%s --seed=%s --%s O=%s E=%s X=%s Y=%s K=%s L=%s --%s' % \ |
---|
| 169 | ( options.gfextend, options.chain, options.strand, options.seed, options.transition, |
---|
| 170 | options.O, options.E, options.X, options.Y, options.K, options.L, options.entropy ) |
---|
| 171 | # Specify input2 and add [fullnames] modifier if output format is diffs |
---|
| 172 | if options.format == 'diffs': |
---|
| 173 | input2 = '%s[fullnames]' % options.input2 |
---|
| 174 | else: |
---|
| 175 | input2 = options.input2 |
---|
| 176 | if options.format == 'tabular': |
---|
| 177 | # Change output format to general if it's tabular and add field names for tabular output |
---|
| 178 | format = 'general-' |
---|
| 179 | tabular_fields = ':score,name1,strand1,size1,start1,zstart1,end1,length1,text1,name2,strand2,size2,start2,zstart2,end2,start2+,zstart2+,end2+,length2,text2,diff,cigar,identity,coverage,gaprate,diagonal,shingle' |
---|
| 180 | elif options.format == 'sam': |
---|
| 181 | # We currently ALWAYS suppress SAM headers. |
---|
| 182 | format = 'sam-' |
---|
| 183 | tabular_fields = '' |
---|
| 184 | else: |
---|
| 185 | format = options.format |
---|
| 186 | tabular_fields = '' |
---|
| 187 | |
---|
| 188 | # Set up our queues |
---|
| 189 | lastz_job_queue = LastzJobQueue( WORKERS, slots=SLOTS ) |
---|
| 190 | combine_data_queue = CombineDataQueue( options.output ) |
---|
| 191 | |
---|
| 192 | if options.ref_source == 'history': |
---|
| 193 | # Reference is a fasta dataset from the history, so split job across |
---|
| 194 | # the number of sequences in the dataset ( this could be a HUGE number ) |
---|
| 195 | try: |
---|
| 196 | # Ensure there is at least 1 sequence in the dataset ( this may not be necessary ). |
---|
| 197 | error_msg = "The reference dataset is missing metadata, click the pencil icon in the history item and 'auto-detect' the metadata attributes." |
---|
| 198 | ref_sequences = int( options.ref_sequences ) |
---|
| 199 | if ref_sequences < 1: |
---|
| 200 | stop_queues( lastz_job_queue, combine_data_queue ) |
---|
| 201 | stop_err( error_msg ) |
---|
| 202 | except: |
---|
| 203 | stop_queues( lastz_job_queue, combine_data_queue ) |
---|
| 204 | stop_err( error_msg ) |
---|
| 205 | seqs = 0 |
---|
| 206 | fasta_reader = FastaReader( open( options.input1 ) ) |
---|
| 207 | while True: |
---|
| 208 | # Read the next sequence from the reference dataset |
---|
| 209 | seq = fasta_reader.next() |
---|
| 210 | if not seq: |
---|
| 211 | break |
---|
| 212 | seqs += 1 |
---|
| 213 | # Create a temporary file to contain the current sequence as input to lastz |
---|
| 214 | tmp_in_fd, tmp_in_name = tempfile.mkstemp( suffix='.in' ) |
---|
| 215 | tmp_in = os.fdopen( tmp_in_fd, 'wb' ) |
---|
| 216 | # Write the current sequence to the temporary input file |
---|
| 217 | tmp_in.write( '>%s\n%s\n' % ( seq.name, seq.text ) ) |
---|
| 218 | tmp_in.close() |
---|
| 219 | # Create a 2nd temporary file to contain the output from lastz execution on the current sequence |
---|
| 220 | tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) |
---|
| 221 | os.close( tmp_out_fd ) |
---|
| 222 | # Generate the command line for calling lastz on the current sequence |
---|
| 223 | command = 'lastz %s%s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s > %s' % \ |
---|
| 224 | ( tmp_in_name, unmask, ref_name, input2, set_options, options.identity_min, |
---|
| 225 | options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) |
---|
| 226 | # Create a job object |
---|
| 227 | job = Bunch() |
---|
| 228 | job.command = command |
---|
| 229 | job.output = tmp_out_name |
---|
| 230 | job.cleanup = [ tmp_in_name, tmp_out_name ] |
---|
| 231 | job.combine_data_queue = combine_data_queue |
---|
| 232 | # Add another job to the lastz_job_queue. Execution |
---|
| 233 | # will wait at this point if the queue is full. |
---|
| 234 | lastz_job_queue.put( job, block=True ) |
---|
| 235 | # Make sure the value of sequences in the metadata is the same as the |
---|
| 236 | # number of sequences read from the dataset ( this may not be necessary ). |
---|
| 237 | if ref_sequences != seqs: |
---|
| 238 | stop_queues( lastz_job_queue, combine_data_queue ) |
---|
| 239 | stop_err( "The value of metadata.sequences (%d) differs from the number of sequences read from the reference (%d)." % ( ref_sequences, seqs ) ) |
---|
| 240 | else: |
---|
| 241 | # Reference is a locally cached 2bit file, split job across number of chroms in 2bit file |
---|
| 242 | tbf = TwoBitFile( open( options.input1, 'r' ) ) |
---|
| 243 | for chrom in tbf.keys(): |
---|
| 244 | # Create a temporary file to contain the output from lastz execution on the current chrom |
---|
| 245 | tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) |
---|
| 246 | os.close( tmp_out_fd ) |
---|
| 247 | command = 'lastz %s/%s%s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s >> %s' % \ |
---|
| 248 | ( options.input1, chrom, unmask, ref_name, input2, set_options, options.identity_min, |
---|
| 249 | options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) |
---|
| 250 | # Create a job object |
---|
| 251 | job = Bunch() |
---|
| 252 | job.command = command |
---|
| 253 | job.output = tmp_out_name |
---|
| 254 | job.cleanup = [ tmp_out_name ] |
---|
| 255 | job.combine_data_queue = combine_data_queue |
---|
| 256 | # Add another job to the lastz_job_queue. Execution |
---|
| 257 | # will wait at this point if the queue is full. |
---|
| 258 | lastz_job_queue.put( job, block=True ) |
---|
| 259 | |
---|
| 260 | # Stop the lastz_job_queue |
---|
| 261 | for t in lastz_job_queue.threads: |
---|
| 262 | lastz_job_queue.put( STOP_SIGNAL, True ) |
---|
| 263 | # Although all jobs are submitted to the queue, we can't shut down the combine_data_queue |
---|
| 264 | # until we know that all jobs have been submitted to its queue. We do this by checking |
---|
| 265 | # whether all of the threads in the lastz_job_queue have terminated. |
---|
| 266 | while threading.activeCount() > 2: |
---|
| 267 | time.sleep( 1 ) |
---|
| 268 | # Now it's safe to stop the combine_data_queue |
---|
| 269 | combine_data_queue.put( STOP_SIGNAL ) |
---|
| 270 | |
---|
| 271 | if __name__=="__main__": __main__() |
---|