1 | #!/usr/bin/env python |
---|
2 | |
---|
3 | """ |
---|
4 | Runs Lastz |
---|
5 | Written for Lastz v. 1.01.88. |
---|
6 | |
---|
7 | usage: lastz_wrapper.py [options] |
---|
8 | --ref_name: The reference name to change all output matches to |
---|
9 | --ref_source: Whether the reference is cached or from the history |
---|
10 | --source_select: Whether to used pre-set or cached reference file |
---|
11 | --input1: The name of the reference file if using history or reference base name if using cached |
---|
12 | --input2: The reads file to align |
---|
13 | --ref_sequences: The number of sequences in the reference file if using one from history |
---|
14 | --pre_set_options: Which of the pre set options to use, if using pre-sets |
---|
15 | --strand: Which strand of the read to search, if specifying all parameters |
---|
16 | --seed: Seeding settings, if specifying all parameters |
---|
17 | --gfextend: Whether to perform gap-free extension of seed hits to HSPs (high scoring segment pairs), if specifying all parameters |
---|
18 | --chain: Whether to perform chaining of HSPs, if specifying all parameters |
---|
19 | --transition: Number of transitions to allow in each seed hit, if specifying all parameters |
---|
20 | --O: Gap opening penalty, if specifying all parameters |
---|
21 | --E: Gap extension penalty, if specifying all parameters |
---|
22 | --X: X-drop threshold, if specifying all parameters |
---|
23 | --Y: Y-drop threshold, if specifying all parameters |
---|
24 | --K: Threshold for HSPs, if specifying all parameters |
---|
25 | --L: Threshold for gapped alignments, if specifying all parameters |
---|
26 | --entropy: Whether to involve entropy when filtering HSPs, if specifying all parameters |
---|
27 | --identity_min: Minimum identity (don't report matches under this identity) |
---|
28 | --identity_max: Maximum identity (don't report matches above this identity) |
---|
29 | --coverage: The minimum coverage value (don't report matches covering less than this) |
---|
30 | --unmask: Whether to convert lowercase bases to uppercase |
---|
31 | --out_format: The format of the output file (sam, diffs, or tabular (general)) |
---|
32 | --output: The name of the output file |
---|
33 | --lastzSeqsFileDir: Directory of local lastz_seqs.loc file |
---|
34 | """ |
---|
35 | import optparse, os, subprocess, shutil, sys, tempfile, threading, time |
---|
36 | from Queue import Queue |
---|
37 | |
---|
38 | from galaxy import eggs |
---|
39 | import pkg_resources |
---|
40 | pkg_resources.require( 'bx-python' ) |
---|
41 | from bx.seq.twobit import * |
---|
42 | from bx.seq.fasta import FastaReader |
---|
43 | from galaxy.util.bunch import Bunch |
---|
44 | |
---|
45 | STOP_SIGNAL = object() |
---|
46 | WORKERS = 4 |
---|
47 | SLOTS = 128 |
---|
48 | |
---|
49 | def stop_err( msg ): |
---|
50 | sys.stderr.write( "%s" % msg ) |
---|
51 | sys.exit() |
---|
52 | |
---|
53 | def stop_queues( lastz, combine_data ): |
---|
54 | # This method should only be called if an error has been encountered. |
---|
55 | # Send STOP_SIGNAL to all worker threads |
---|
56 | for t in lastz.threads: |
---|
57 | lastz.put( STOP_SIGNAL, True ) |
---|
58 | combine_data.put( STOP_SIGNAL, True ) |
---|
59 | |
---|
60 | class BaseQueue( object ): |
---|
61 | def __init__( self, num_threads, slots=-1 ): |
---|
62 | # Initialize the queue and worker threads |
---|
63 | self.queue = Queue( slots ) |
---|
64 | self.threads = [] |
---|
65 | for i in range( num_threads ): |
---|
66 | worker = threading.Thread( target=self.run_next ) |
---|
67 | worker.start() |
---|
68 | self.threads.append( worker ) |
---|
69 | def run_next( self ): |
---|
70 | # Run the next job, waiting until one is available if necessary |
---|
71 | while True: |
---|
72 | job = self.queue.get() |
---|
73 | if job is STOP_SIGNAL: |
---|
74 | return self.shutdown() |
---|
75 | self.run_job( job ) |
---|
76 | time.sleep( 1 ) |
---|
77 | def run_job( self, job ): |
---|
78 | stop_err( 'Not Implemented' ) |
---|
79 | def put( self, job, block=False ): |
---|
80 | # Add a job to the queue |
---|
81 | self.queue.put( job, block ) |
---|
82 | def shutdown( self ): |
---|
83 | return |
---|
84 | |
---|
85 | class LastzJobQueue( BaseQueue ): |
---|
86 | """ |
---|
87 | A queue that runs commands in parallel. Blocking is done so the queue will |
---|
88 | not consume much memory. |
---|
89 | """ |
---|
90 | def run_job( self, job ): |
---|
91 | # Execute the job's command |
---|
92 | proc = subprocess.Popen( args=job.command, shell=True, stderr=subprocess.PIPE, ) |
---|
93 | proc.wait() |
---|
94 | stderr = proc.stderr.read() |
---|
95 | proc.wait() |
---|
96 | if stderr: |
---|
97 | stop_queues( self, job.combine_data_queue ) |
---|
98 | stop_err( stderr ) |
---|
99 | job.combine_data_queue.put( job ) |
---|
100 | |
---|
101 | class CombineDataQueue( BaseQueue ): |
---|
102 | """ |
---|
103 | A queue that concatenates files in serial. Blocking is not done since this |
---|
104 | queue is not expected to grow larger than the command queue. |
---|
105 | """ |
---|
106 | def __init__( self, output_filename, num_threads=1 ): |
---|
107 | BaseQueue.__init__( self, num_threads ) |
---|
108 | self.CHUNK_SIZE = 2**20 # 1Mb |
---|
109 | self.output_file = open( output_filename, 'wb' ) |
---|
110 | def run_job( self, job ): |
---|
111 | in_file = open( job.output, 'rb' ) |
---|
112 | while True: |
---|
113 | chunk = in_file.read( self.CHUNK_SIZE ) |
---|
114 | if not chunk: |
---|
115 | in_file.close() |
---|
116 | break |
---|
117 | self.output_file.write( chunk ) |
---|
118 | for file_name in job.cleanup: |
---|
119 | os.remove( file_name ) |
---|
120 | def shutdown( self ): |
---|
121 | self.output_file.close() |
---|
122 | return |
---|
123 | |
---|
124 | def __main__(): |
---|
125 | #Parse Command Line |
---|
126 | parser = optparse.OptionParser() |
---|
127 | parser.add_option( '', '--ref_name', dest='ref_name', help='The reference name to change all output matches to' ) |
---|
128 | parser.add_option( '', '--ref_source', dest='ref_source', help='Whether the reference is cached or from the history' ) |
---|
129 | parser.add_option( '', '--ref_sequences', dest='ref_sequences', help='Number of sequences in the reference dataset' ) |
---|
130 | parser.add_option( '', '--source_select', dest='source_select', help='Whether to used pre-set or cached reference file' ) |
---|
131 | parser.add_option( '', '--input1', dest='input1', help='The name of the reference file if using history or reference base name if using cached' ) |
---|
132 | parser.add_option( '', '--input2', dest='input2', help='The reads file to align' ) |
---|
133 | parser.add_option( '', '--pre_set_options', dest='pre_set_options', help='Which of the pre set options to use, if using pre-sets' ) |
---|
134 | parser.add_option( '', '--strand', dest='strand', help='Which strand of the read to search, if specifying all parameters' ) |
---|
135 | parser.add_option( '', '--seed', dest='seed', help='Seeding settings, if specifying all parameters' ) |
---|
136 | parser.add_option( '', '--transition', dest='transition', help='Number of transitions to allow in each seed hit, if specifying all parameters' ) |
---|
137 | parser.add_option( '', '--gfextend', dest='gfextend', help='Whether to perform gap-free extension of seed hits to HSPs (high scoring segment pairs), if specifying all parameters' ) |
---|
138 | parser.add_option( '', '--chain', dest='chain', help='Whether to perform chaining of HSPs, if specifying all parameters' ) |
---|
139 | parser.add_option( '', '--O', dest='O', help='Gap opening penalty, if specifying all parameters' ) |
---|
140 | parser.add_option( '', '--E', dest='E', help='Gap extension penalty, if specifying all parameters' ) |
---|
141 | parser.add_option( '', '--X', dest='X', help='X-drop threshold, if specifying all parameters' ) |
---|
142 | parser.add_option( '', '--Y', dest='Y', help='Y-drop threshold, if specifying all parameters' ) |
---|
143 | parser.add_option( '', '--K', dest='K', help='Threshold for HSPs, if specifying all parameters' ) |
---|
144 | parser.add_option( '', '--L', dest='L', help='Threshold for gapped alignments, if specifying all parameters' ) |
---|
145 | parser.add_option( '', '--entropy', dest='entropy', help='Whether to involve entropy when filtering HSPs, if specifying all parameters' ) |
---|
146 | parser.add_option( '', '--identity_min', dest='identity_min', help="Minimum identity (don't report matches under this identity)" ) |
---|
147 | parser.add_option( '', '--identity_max', dest='identity_max', help="Maximum identity (don't report matches above this identity)" ) |
---|
148 | parser.add_option( '', '--coverage', dest='coverage', help="The minimum coverage value (don't report matches covering less than this)" ) |
---|
149 | parser.add_option( '', '--unmask', dest='unmask', help='Whether to convert lowercase bases to uppercase' ) |
---|
150 | parser.add_option( '', '--out_format', dest='format', help='The format of the output file (sam, diffs, or tabular (general))' ) |
---|
151 | parser.add_option( '', '--output', dest='output', help='The output file' ) |
---|
152 | parser.add_option( '', '--lastzSeqsFileDir', dest='lastzSeqsFileDir', help='Directory of local lastz_seqs.loc file' ) |
---|
153 | ( options, args ) = parser.parse_args() |
---|
154 | |
---|
155 | if options.unmask == 'yes': |
---|
156 | unmask = '[unmask]' |
---|
157 | else: |
---|
158 | unmask = '' |
---|
159 | if options.ref_name != 'None': |
---|
160 | ref_name = '[nickname=%s]' % options.ref_name |
---|
161 | else: |
---|
162 | ref_name = '' |
---|
163 | # Prepare for commonly-used preset options |
---|
164 | if options.source_select == 'pre_set': |
---|
165 | set_options = '--%s' % options.pre_set_options |
---|
166 | # Prepare for user-specified options |
---|
167 | else: |
---|
168 | set_options = '--%s --%s --gapped --strand=%s --seed=%s --%s O=%s E=%s X=%s Y=%s K=%s L=%s --%s' % \ |
---|
169 | ( options.gfextend, options.chain, options.strand, options.seed, options.transition, |
---|
170 | options.O, options.E, options.X, options.Y, options.K, options.L, options.entropy ) |
---|
171 | # Specify input2 and add [fullnames] modifier if output format is diffs |
---|
172 | if options.format == 'diffs': |
---|
173 | input2 = '%s[fullnames]' % options.input2 |
---|
174 | else: |
---|
175 | input2 = options.input2 |
---|
176 | if options.format == 'tabular': |
---|
177 | # Change output format to general if it's tabular and add field names for tabular output |
---|
178 | format = 'general-' |
---|
179 | tabular_fields = ':score,name1,strand1,size1,start1,zstart1,end1,length1,text1,name2,strand2,size2,start2,zstart2,end2,start2+,zstart2+,end2+,length2,text2,diff,cigar,identity,coverage,gaprate,diagonal,shingle' |
---|
180 | elif options.format == 'sam': |
---|
181 | # We currently ALWAYS suppress SAM headers. |
---|
182 | format = 'sam-' |
---|
183 | tabular_fields = '' |
---|
184 | else: |
---|
185 | format = options.format |
---|
186 | tabular_fields = '' |
---|
187 | |
---|
188 | # Set up our queues |
---|
189 | lastz_job_queue = LastzJobQueue( WORKERS, slots=SLOTS ) |
---|
190 | combine_data_queue = CombineDataQueue( options.output ) |
---|
191 | |
---|
192 | if options.ref_source == 'history': |
---|
193 | # Reference is a fasta dataset from the history, so split job across |
---|
194 | # the number of sequences in the dataset ( this could be a HUGE number ) |
---|
195 | try: |
---|
196 | # Ensure there is at least 1 sequence in the dataset ( this may not be necessary ). |
---|
197 | error_msg = "The reference dataset is missing metadata, click the pencil icon in the history item and 'auto-detect' the metadata attributes." |
---|
198 | ref_sequences = int( options.ref_sequences ) |
---|
199 | if ref_sequences < 1: |
---|
200 | stop_queues( lastz_job_queue, combine_data_queue ) |
---|
201 | stop_err( error_msg ) |
---|
202 | except: |
---|
203 | stop_queues( lastz_job_queue, combine_data_queue ) |
---|
204 | stop_err( error_msg ) |
---|
205 | seqs = 0 |
---|
206 | fasta_reader = FastaReader( open( options.input1 ) ) |
---|
207 | while True: |
---|
208 | # Read the next sequence from the reference dataset |
---|
209 | seq = fasta_reader.next() |
---|
210 | if not seq: |
---|
211 | break |
---|
212 | seqs += 1 |
---|
213 | # Create a temporary file to contain the current sequence as input to lastz |
---|
214 | tmp_in_fd, tmp_in_name = tempfile.mkstemp( suffix='.in' ) |
---|
215 | tmp_in = os.fdopen( tmp_in_fd, 'wb' ) |
---|
216 | # Write the current sequence to the temporary input file |
---|
217 | tmp_in.write( '>%s\n%s\n' % ( seq.name, seq.text ) ) |
---|
218 | tmp_in.close() |
---|
219 | # Create a 2nd temporary file to contain the output from lastz execution on the current sequence |
---|
220 | tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) |
---|
221 | os.close( tmp_out_fd ) |
---|
222 | # Generate the command line for calling lastz on the current sequence |
---|
223 | command = 'lastz %s%s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s > %s' % \ |
---|
224 | ( tmp_in_name, unmask, ref_name, input2, set_options, options.identity_min, |
---|
225 | options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) |
---|
226 | # Create a job object |
---|
227 | job = Bunch() |
---|
228 | job.command = command |
---|
229 | job.output = tmp_out_name |
---|
230 | job.cleanup = [ tmp_in_name, tmp_out_name ] |
---|
231 | job.combine_data_queue = combine_data_queue |
---|
232 | # Add another job to the lastz_job_queue. Execution |
---|
233 | # will wait at this point if the queue is full. |
---|
234 | lastz_job_queue.put( job, block=True ) |
---|
235 | # Make sure the value of sequences in the metadata is the same as the |
---|
236 | # number of sequences read from the dataset ( this may not be necessary ). |
---|
237 | if ref_sequences != seqs: |
---|
238 | stop_queues( lastz_job_queue, combine_data_queue ) |
---|
239 | stop_err( "The value of metadata.sequences (%d) differs from the number of sequences read from the reference (%d)." % ( ref_sequences, seqs ) ) |
---|
240 | else: |
---|
241 | # Reference is a locally cached 2bit file, split job across number of chroms in 2bit file |
---|
242 | tbf = TwoBitFile( open( options.input1, 'r' ) ) |
---|
243 | for chrom in tbf.keys(): |
---|
244 | # Create a temporary file to contain the output from lastz execution on the current chrom |
---|
245 | tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) |
---|
246 | os.close( tmp_out_fd ) |
---|
247 | command = 'lastz %s/%s%s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s >> %s' % \ |
---|
248 | ( options.input1, chrom, unmask, ref_name, input2, set_options, options.identity_min, |
---|
249 | options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) |
---|
250 | # Create a job object |
---|
251 | job = Bunch() |
---|
252 | job.command = command |
---|
253 | job.output = tmp_out_name |
---|
254 | job.cleanup = [ tmp_out_name ] |
---|
255 | job.combine_data_queue = combine_data_queue |
---|
256 | # Add another job to the lastz_job_queue. Execution |
---|
257 | # will wait at this point if the queue is full. |
---|
258 | lastz_job_queue.put( job, block=True ) |
---|
259 | |
---|
260 | # Stop the lastz_job_queue |
---|
261 | for t in lastz_job_queue.threads: |
---|
262 | lastz_job_queue.put( STOP_SIGNAL, True ) |
---|
263 | # Although all jobs are submitted to the queue, we can't shut down the combine_data_queue |
---|
264 | # until we know that all jobs have been submitted to its queue. We do this by checking |
---|
265 | # whether all of the threads in the lastz_job_queue have terminated. |
---|
266 | while threading.activeCount() > 2: |
---|
267 | time.sleep( 1 ) |
---|
268 | # Now it's safe to stop the combine_data_queue |
---|
269 | combine_data_queue.put( STOP_SIGNAL ) |
---|
270 | |
---|
271 | if __name__=="__main__": __main__() |
---|