1 | #!/usr/bin/env python |
---|
2 | #Dan Blankenberg |
---|
3 | """ |
---|
4 | Script to Join Two Files on specified columns. |
---|
5 | |
---|
6 | Takes two tab delimited files, two column numbers (base 1) and outputs a new tab delimited file with lines joined by tabs. |
---|
7 | User can also opt to have have non-joining rows of file1 echoed. |
---|
8 | |
---|
9 | """ |
---|
10 | |
---|
11 | import optparse, os, sys, tempfile, struct |
---|
12 | import psyco_full |
---|
13 | |
---|
14 | try: |
---|
15 | simple_json_exception = None |
---|
16 | from galaxy import eggs |
---|
17 | from galaxy.util.bunch import Bunch |
---|
18 | from galaxy.util import stringify_dictionary_keys |
---|
19 | import pkg_resources |
---|
20 | pkg_resources.require("simplejson") |
---|
21 | import simplejson |
---|
22 | except Exception, e: |
---|
23 | simplejson_exception = e |
---|
24 | simplejson = None |
---|
25 | |
---|
26 | |
---|
27 | class OffsetList: |
---|
28 | def __init__( self, filesize = 0, fmt = None ): |
---|
29 | self.file = tempfile.NamedTemporaryFile( 'w+b' ) |
---|
30 | if fmt: |
---|
31 | self.fmt = fmt |
---|
32 | elif filesize and filesize <= sys.maxint * 2: |
---|
33 | self.fmt = 'I' |
---|
34 | else: |
---|
35 | self.fmt = 'Q' |
---|
36 | self.fmt_size = struct.calcsize( self.fmt ) |
---|
37 | @property |
---|
38 | def size( self ): |
---|
39 | self.file.flush() |
---|
40 | return self.file_size / self.fmt_size |
---|
41 | @property |
---|
42 | def file_size( self ): |
---|
43 | self.file.flush() |
---|
44 | return os.stat( self.file.name ).st_size |
---|
45 | def add_offset( self, offset ): |
---|
46 | if not isinstance( offset, list ): |
---|
47 | offset = [offset] |
---|
48 | self.file.seek( self.file_size ) |
---|
49 | for off in offset: |
---|
50 | self.file.write( struct.pack( self.fmt, off ) ) |
---|
51 | def get_offsets( self, start = 0 ): |
---|
52 | self.file.seek( start * self.fmt_size ) |
---|
53 | while True: |
---|
54 | packed = self.file.read( self.fmt_size ) |
---|
55 | if not packed: break |
---|
56 | yield struct.unpack( self.fmt, packed )[0] |
---|
57 | def get_offset_by_index( self, index ): |
---|
58 | self.file.seek( index * self.fmt_size ) |
---|
59 | return struct.unpack( self.fmt, self.file.read( self.fmt_size ) )[0] |
---|
60 | def set_offset_at_index( self, index, offset ): |
---|
61 | if not isinstance( offset, list ): |
---|
62 | offset = [offset] |
---|
63 | if index >= self.size: |
---|
64 | self.add_offset( offset ) |
---|
65 | else: |
---|
66 | temp_file = tempfile.NamedTemporaryFile( 'w+b' ) |
---|
67 | self.file.seek( 0 ) |
---|
68 | temp_file.write( self.file.read( ( index ) * self.fmt_size ) ) |
---|
69 | for off in offset: |
---|
70 | temp_file.write( struct.pack( self.fmt, off ) ) |
---|
71 | temp_file.write( self.file.read() ) |
---|
72 | self.file = temp_file |
---|
73 | |
---|
74 | class SortedOffsets( OffsetList ): |
---|
75 | def __init__( self, indexed_filename, column, split = None ): |
---|
76 | OffsetList.__init__( self, os.stat( indexed_filename ).st_size ) |
---|
77 | self.indexed_filename = indexed_filename |
---|
78 | self.indexed_file = open( indexed_filename, 'rb' ) |
---|
79 | self.column = column |
---|
80 | self.split = split |
---|
81 | self.last_identifier = None |
---|
82 | self.last_identifier_merged = None |
---|
83 | self.last_offset_merged = 0 |
---|
84 | def merge_with_dict( self, new_offset_dict ): |
---|
85 | if not new_offset_dict: return #no items to merge in |
---|
86 | keys = new_offset_dict.keys() |
---|
87 | keys.sort() |
---|
88 | identifier2 = keys.pop( 0 ) |
---|
89 | |
---|
90 | result_offsets = OffsetList( fmt = self.fmt ) |
---|
91 | offsets1 = enumerate( self.get_offsets() ) |
---|
92 | try: |
---|
93 | index1, offset1 = offsets1.next() |
---|
94 | identifier1 = self.get_identifier_by_offset( offset1 ) |
---|
95 | except StopIteration: |
---|
96 | offset1 = None |
---|
97 | identifier1 = None |
---|
98 | index1 = 0 |
---|
99 | |
---|
100 | while True: |
---|
101 | if identifier1 is None and identifier2 is None: |
---|
102 | self.file = result_offsets.file #self is now merged results |
---|
103 | return |
---|
104 | elif identifier1 is None or ( identifier2 and identifier2 < identifier1 ): |
---|
105 | result_offsets.add_offset( new_offset_dict[identifier2] ) |
---|
106 | if keys: |
---|
107 | identifier2 = keys.pop( 0 ) |
---|
108 | else: |
---|
109 | identifier2 = None |
---|
110 | elif identifier2 is None: |
---|
111 | result_offsets.file.seek( result_offsets.file_size ) |
---|
112 | self.file.seek( index1 * self.fmt_size ) |
---|
113 | result_offsets.file.write( self.file.read() ) |
---|
114 | identifier1 = None |
---|
115 | offset1 = None |
---|
116 | else: |
---|
117 | result_offsets.add_offset( offset1 ) |
---|
118 | try: |
---|
119 | index1, offset1 = offsets1.next() |
---|
120 | identifier1 = self.get_identifier_by_offset( offset1 ) |
---|
121 | except StopIteration: |
---|
122 | offset1 = None |
---|
123 | identifier1 = None |
---|
124 | index1 += 1 |
---|
125 | #methods to help link offsets to lines, ids, etc |
---|
126 | def get_identifier_by_line( self, line ): |
---|
127 | if isinstance( line, str ): |
---|
128 | fields = line.rstrip( '\r\n' ).split( self.split ) |
---|
129 | if self.column < len( fields ): |
---|
130 | return fields[self.column] |
---|
131 | return None |
---|
132 | def get_line_by_offset( self, offset ): |
---|
133 | self.indexed_file.seek( offset ) |
---|
134 | return self.indexed_file.readline() |
---|
135 | def get_identifier_by_offset( self, offset ): |
---|
136 | return self.get_identifier_by_line( self.get_line_by_offset( offset ) ) |
---|
137 | |
---|
138 | #indexed set of offsets, index is built on demand |
---|
139 | class OffsetIndex: |
---|
140 | def __init__( self, filename, column, split = None, index_depth = 3 ): |
---|
141 | self.filename = filename |
---|
142 | self.file = open( filename, 'rb' ) |
---|
143 | self.column = column |
---|
144 | self.split = split |
---|
145 | self._offsets = {} |
---|
146 | self._index = None |
---|
147 | self.index_depth = index_depth |
---|
148 | def _build_index( self ): |
---|
149 | self._index = {} |
---|
150 | for start_char, sorted_offsets in self._offsets.items(): |
---|
151 | self._index[start_char]={} |
---|
152 | for i, offset in enumerate( sorted_offsets.get_offsets() ): |
---|
153 | identifier = sorted_offsets.get_identifier_by_offset( offset ) |
---|
154 | if identifier[0:self.index_depth] not in self._index[start_char]: |
---|
155 | self._index[start_char][identifier[0:self.index_depth]] = i |
---|
156 | def get_lines_by_identifier( self, identifier ): |
---|
157 | if not identifier: return |
---|
158 | #if index doesn't exist, build it |
---|
159 | if self._index is None: self._build_index() |
---|
160 | |
---|
161 | #identifier cannot exist |
---|
162 | if identifier[0] not in self._index or identifier[0:self.index_depth] not in self._index[identifier[0]]: |
---|
163 | return |
---|
164 | #identifier might exist, search for it |
---|
165 | offset_index = self._index[identifier[0]][identifier[0:self.index_depth]] |
---|
166 | while True: |
---|
167 | if offset_index >= self._offsets[identifier[0]].size: |
---|
168 | return |
---|
169 | offset = self._offsets[identifier[0]].get_offset_by_index( offset_index ) |
---|
170 | identifier2 = self._offsets[identifier[0]].get_identifier_by_offset( offset ) |
---|
171 | if not identifier2 or identifier2 > identifier: |
---|
172 | return |
---|
173 | if identifier2 == identifier: |
---|
174 | yield self._offsets[identifier[0]].get_line_by_offset( offset ) |
---|
175 | offset_index += 1 |
---|
176 | def get_offsets( self ): |
---|
177 | keys = self._offsets.keys() |
---|
178 | keys.sort() |
---|
179 | for key in keys: |
---|
180 | for offset in self._offsets[key].get_offsets(): |
---|
181 | yield offset |
---|
182 | def get_line_by_offset( self, offset ): |
---|
183 | self.file.seek( offset ) |
---|
184 | return self.file.readline() |
---|
185 | def get_identifiers_offsets( self ): |
---|
186 | keys = self._offsets.keys() |
---|
187 | keys.sort() |
---|
188 | for key in keys: |
---|
189 | for offset in self._offsets[key].get_offsets(): |
---|
190 | yield self._offsets[key].get_identifier_by_offset( offset ), offset |
---|
191 | def get_identifier_by_line( self, line ): |
---|
192 | if isinstance( line, str ): |
---|
193 | fields = line.rstrip( '\r\n' ).split( self.split ) |
---|
194 | if self.column < len( fields ): |
---|
195 | return fields[self.column] |
---|
196 | return None |
---|
197 | def merge_with_dict( self, d ): |
---|
198 | if not d: return #no data to merge |
---|
199 | self._index = None |
---|
200 | keys = d.keys() |
---|
201 | keys.sort() |
---|
202 | identifier = keys.pop( 0 ) |
---|
203 | first_char = identifier[0] |
---|
204 | temp = { identifier: d[identifier] } |
---|
205 | while True: |
---|
206 | if not keys: |
---|
207 | if first_char not in self._offsets: |
---|
208 | self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) |
---|
209 | self._offsets[first_char].merge_with_dict( temp ) |
---|
210 | return |
---|
211 | identifier = keys.pop( 0 ) |
---|
212 | if identifier[0] == first_char: |
---|
213 | temp[identifier] = d[identifier] |
---|
214 | else: |
---|
215 | if first_char not in self._offsets: |
---|
216 | self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) |
---|
217 | self._offsets[first_char].merge_with_dict( temp ) |
---|
218 | temp = { identifier: d[identifier] } |
---|
219 | first_char = identifier[0] |
---|
220 | |
---|
221 | class BufferedIndex: |
---|
222 | def __init__( self, filename, column, split = None, buffer = 1000000, index_depth = 3 ): |
---|
223 | self.index = OffsetIndex( filename, column, split, index_depth ) |
---|
224 | self.buffered_offsets = {} |
---|
225 | f = open( filename, 'rb' ) |
---|
226 | offset = f.tell() |
---|
227 | identified_offset_count = 1 |
---|
228 | while True: |
---|
229 | offset = f.tell() |
---|
230 | line = f.readline() |
---|
231 | if not line: break #EOF |
---|
232 | identifier = self.index.get_identifier_by_line( line ) |
---|
233 | if identifier: |
---|
234 | #flush buffered offsets, if buffer size reached |
---|
235 | if buffer and identified_offset_count % buffer == 0: |
---|
236 | self.index.merge_with_dict( self.buffered_offsets ) |
---|
237 | self.buffered_offsets = {} |
---|
238 | if identifier not in self.buffered_offsets: |
---|
239 | self.buffered_offsets[identifier] = [] |
---|
240 | self.buffered_offsets[identifier].append( offset ) |
---|
241 | identified_offset_count += 1 |
---|
242 | f.close() |
---|
243 | |
---|
244 | def get_lines_by_identifier( self, identifier ): |
---|
245 | for line in self.index.get_lines_by_identifier( identifier ): |
---|
246 | yield line |
---|
247 | if identifier in self.buffered_offsets: |
---|
248 | for offset in self.buffered_offsets[identifier]: |
---|
249 | yield self.index.get_line_by_offset( offset ) |
---|
250 | |
---|
251 | |
---|
252 | def fill_empty_columns( line, split, fill_values ): |
---|
253 | if not fill_values: |
---|
254 | return line |
---|
255 | filled_columns = [] |
---|
256 | for i, field in enumerate( line.split( split ) ): |
---|
257 | if field or i >= len( fill_values ): |
---|
258 | filled_columns.append( field ) |
---|
259 | else: |
---|
260 | filled_columns.append( fill_values[i] ) |
---|
261 | if len( fill_values ) > len( filled_columns ): |
---|
262 | filled_columns.extend( fill_values[ len( filled_columns ) : ] ) |
---|
263 | return split.join( filled_columns ) |
---|
264 | |
---|
265 | |
---|
266 | def join_files( filename1, column1, filename2, column2, out_filename, split = None, buffer = 1000000, keep_unmatched = False, keep_partial = False, index_depth = 3, fill_options = None ): |
---|
267 | #return identifier based upon line |
---|
268 | def get_identifier_by_line( line, column, split = None ): |
---|
269 | if isinstance( line, str ): |
---|
270 | fields = line.rstrip( '\r\n' ).split( split ) |
---|
271 | if column < len( fields ): |
---|
272 | return fields[column] |
---|
273 | return None |
---|
274 | if fill_options is None: |
---|
275 | fill_options = Bunch( fill_unjoined_only = True, file1_columns = None, file2_columns = None ) |
---|
276 | out = open( out_filename, 'w+b' ) |
---|
277 | index = BufferedIndex( filename2, column2, split, buffer, index_depth ) |
---|
278 | for line1 in open( filename1, 'rb' ): |
---|
279 | identifier = get_identifier_by_line( line1, column1, split ) |
---|
280 | if identifier: |
---|
281 | written = False |
---|
282 | for line2 in index.get_lines_by_identifier( identifier ): |
---|
283 | if not fill_options.fill_unjoined_only: |
---|
284 | out.write( "%s%s%s\n" % ( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ), split, fill_empty_columns( line2.rstrip( '\r\n' ), split, fill_options.file2_columns ) ) ) |
---|
285 | else: |
---|
286 | out.write( "%s%s%s\n" % ( line1.rstrip( '\r\n' ), split, line2.rstrip( '\r\n' ) ) ) |
---|
287 | written = True |
---|
288 | if not written and keep_unmatched: |
---|
289 | out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) |
---|
290 | if fill_options: |
---|
291 | if fill_options.file2_columns: |
---|
292 | out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) |
---|
293 | out.write( "\n" ) |
---|
294 | elif keep_partial: |
---|
295 | out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) |
---|
296 | if fill_options: |
---|
297 | if fill_options.file2_columns: |
---|
298 | out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) |
---|
299 | out.write( "\n" ) |
---|
300 | out.close() |
---|
301 | |
---|
302 | def main(): |
---|
303 | parser = optparse.OptionParser() |
---|
304 | parser.add_option( |
---|
305 | '-b','--buffer', |
---|
306 | dest='buffer', |
---|
307 | type='int',default=1000000, |
---|
308 | help='Number of lines to buffer at a time. Default: 1,000,000 lines. A buffer of 0 will attempt to use memory only.' |
---|
309 | ) |
---|
310 | parser.add_option( |
---|
311 | '-d','--index_depth', |
---|
312 | dest='index_depth', |
---|
313 | type='int',default=3, |
---|
314 | help='Depth to use on filebased offset indexing. Default: 3.' |
---|
315 | ) |
---|
316 | parser.add_option( |
---|
317 | '-p','--keep_partial', |
---|
318 | action='store_true', |
---|
319 | dest='keep_partial', |
---|
320 | default=False, |
---|
321 | help='Keep rows in first input which are missing identifiers.') |
---|
322 | parser.add_option( |
---|
323 | '-u','--keep_unmatched', |
---|
324 | action='store_true', |
---|
325 | dest='keep_unmatched', |
---|
326 | default=False, |
---|
327 | help='Keep rows in first input which are not joined with the second input.') |
---|
328 | parser.add_option( |
---|
329 | '-f','--fill_options_file', |
---|
330 | dest='fill_options_file', |
---|
331 | type='str',default=None, |
---|
332 | help='Fill empty columns with a values from a JSONified file.') |
---|
333 | |
---|
334 | |
---|
335 | options, args = parser.parse_args() |
---|
336 | |
---|
337 | fill_options = None |
---|
338 | if options.fill_options_file is not None: |
---|
339 | try: |
---|
340 | if simplejson is None: |
---|
341 | raise simplejson_exception |
---|
342 | fill_options = Bunch( **stringify_dictionary_keys( simplejson.load( open( options.fill_options_file ) ) ) ) #simplejson.load( open( options.fill_options_file ) ) |
---|
343 | except Exception, e: |
---|
344 | print "Warning: Ignoring fill options due to simplejson error (%s)." % e |
---|
345 | if fill_options is None: |
---|
346 | fill_options = Bunch() |
---|
347 | if 'fill_unjoined_only' not in fill_options: |
---|
348 | fill_options.fill_unjoined_only = True |
---|
349 | if 'file1_columns' not in fill_options: |
---|
350 | fill_options.file1_columns = None |
---|
351 | if 'file2_columns' not in fill_options: |
---|
352 | fill_options.file2_columns = None |
---|
353 | |
---|
354 | |
---|
355 | try: |
---|
356 | filename1 = args[0] |
---|
357 | filename2 = args[1] |
---|
358 | column1 = int( args[2] ) - 1 |
---|
359 | column2 = int( args[3] ) - 1 |
---|
360 | out_filename = args[4] |
---|
361 | except: |
---|
362 | print >> sys.stderr, "Error parsing command line." |
---|
363 | sys.exit() |
---|
364 | |
---|
365 | #Character for splitting fields and joining lines |
---|
366 | split = "\t" |
---|
367 | |
---|
368 | return join_files( filename1, column1, filename2, column2, out_filename, split, options.buffer, options.keep_unmatched, options.keep_partial, options.index_depth, fill_options = fill_options ) |
---|
369 | |
---|
370 | if __name__ == "__main__": main() |
---|