[2] | 1 | #!/usr/bin/env python |
---|
| 2 | #Dan Blankenberg |
---|
| 3 | """ |
---|
| 4 | Script to Join Two Files on specified columns. |
---|
| 5 | |
---|
| 6 | Takes two tab delimited files, two column numbers (base 1) and outputs a new tab delimited file with lines joined by tabs. |
---|
| 7 | User can also opt to have have non-joining rows of file1 echoed. |
---|
| 8 | |
---|
| 9 | """ |
---|
| 10 | |
---|
| 11 | import optparse, os, sys, tempfile, struct |
---|
| 12 | import psyco_full |
---|
| 13 | |
---|
| 14 | try: |
---|
| 15 | simple_json_exception = None |
---|
| 16 | from galaxy import eggs |
---|
| 17 | from galaxy.util.bunch import Bunch |
---|
| 18 | from galaxy.util import stringify_dictionary_keys |
---|
| 19 | import pkg_resources |
---|
| 20 | pkg_resources.require("simplejson") |
---|
| 21 | import simplejson |
---|
| 22 | except Exception, e: |
---|
| 23 | simplejson_exception = e |
---|
| 24 | simplejson = None |
---|
| 25 | |
---|
| 26 | |
---|
| 27 | class OffsetList: |
---|
| 28 | def __init__( self, filesize = 0, fmt = None ): |
---|
| 29 | self.file = tempfile.NamedTemporaryFile( 'w+b' ) |
---|
| 30 | if fmt: |
---|
| 31 | self.fmt = fmt |
---|
| 32 | elif filesize and filesize <= sys.maxint * 2: |
---|
| 33 | self.fmt = 'I' |
---|
| 34 | else: |
---|
| 35 | self.fmt = 'Q' |
---|
| 36 | self.fmt_size = struct.calcsize( self.fmt ) |
---|
| 37 | @property |
---|
| 38 | def size( self ): |
---|
| 39 | self.file.flush() |
---|
| 40 | return self.file_size / self.fmt_size |
---|
| 41 | @property |
---|
| 42 | def file_size( self ): |
---|
| 43 | self.file.flush() |
---|
| 44 | return os.stat( self.file.name ).st_size |
---|
| 45 | def add_offset( self, offset ): |
---|
| 46 | if not isinstance( offset, list ): |
---|
| 47 | offset = [offset] |
---|
| 48 | self.file.seek( self.file_size ) |
---|
| 49 | for off in offset: |
---|
| 50 | self.file.write( struct.pack( self.fmt, off ) ) |
---|
| 51 | def get_offsets( self, start = 0 ): |
---|
| 52 | self.file.seek( start * self.fmt_size ) |
---|
| 53 | while True: |
---|
| 54 | packed = self.file.read( self.fmt_size ) |
---|
| 55 | if not packed: break |
---|
| 56 | yield struct.unpack( self.fmt, packed )[0] |
---|
| 57 | def get_offset_by_index( self, index ): |
---|
| 58 | self.file.seek( index * self.fmt_size ) |
---|
| 59 | return struct.unpack( self.fmt, self.file.read( self.fmt_size ) )[0] |
---|
| 60 | def set_offset_at_index( self, index, offset ): |
---|
| 61 | if not isinstance( offset, list ): |
---|
| 62 | offset = [offset] |
---|
| 63 | if index >= self.size: |
---|
| 64 | self.add_offset( offset ) |
---|
| 65 | else: |
---|
| 66 | temp_file = tempfile.NamedTemporaryFile( 'w+b' ) |
---|
| 67 | self.file.seek( 0 ) |
---|
| 68 | temp_file.write( self.file.read( ( index ) * self.fmt_size ) ) |
---|
| 69 | for off in offset: |
---|
| 70 | temp_file.write( struct.pack( self.fmt, off ) ) |
---|
| 71 | temp_file.write( self.file.read() ) |
---|
| 72 | self.file = temp_file |
---|
| 73 | |
---|
| 74 | class SortedOffsets( OffsetList ): |
---|
| 75 | def __init__( self, indexed_filename, column, split = None ): |
---|
| 76 | OffsetList.__init__( self, os.stat( indexed_filename ).st_size ) |
---|
| 77 | self.indexed_filename = indexed_filename |
---|
| 78 | self.indexed_file = open( indexed_filename, 'rb' ) |
---|
| 79 | self.column = column |
---|
| 80 | self.split = split |
---|
| 81 | self.last_identifier = None |
---|
| 82 | self.last_identifier_merged = None |
---|
| 83 | self.last_offset_merged = 0 |
---|
| 84 | def merge_with_dict( self, new_offset_dict ): |
---|
| 85 | if not new_offset_dict: return #no items to merge in |
---|
| 86 | keys = new_offset_dict.keys() |
---|
| 87 | keys.sort() |
---|
| 88 | identifier2 = keys.pop( 0 ) |
---|
| 89 | |
---|
| 90 | result_offsets = OffsetList( fmt = self.fmt ) |
---|
| 91 | offsets1 = enumerate( self.get_offsets() ) |
---|
| 92 | try: |
---|
| 93 | index1, offset1 = offsets1.next() |
---|
| 94 | identifier1 = self.get_identifier_by_offset( offset1 ) |
---|
| 95 | except StopIteration: |
---|
| 96 | offset1 = None |
---|
| 97 | identifier1 = None |
---|
| 98 | index1 = 0 |
---|
| 99 | |
---|
| 100 | while True: |
---|
| 101 | if identifier1 is None and identifier2 is None: |
---|
| 102 | self.file = result_offsets.file #self is now merged results |
---|
| 103 | return |
---|
| 104 | elif identifier1 is None or ( identifier2 and identifier2 < identifier1 ): |
---|
| 105 | result_offsets.add_offset( new_offset_dict[identifier2] ) |
---|
| 106 | if keys: |
---|
| 107 | identifier2 = keys.pop( 0 ) |
---|
| 108 | else: |
---|
| 109 | identifier2 = None |
---|
| 110 | elif identifier2 is None: |
---|
| 111 | result_offsets.file.seek( result_offsets.file_size ) |
---|
| 112 | self.file.seek( index1 * self.fmt_size ) |
---|
| 113 | result_offsets.file.write( self.file.read() ) |
---|
| 114 | identifier1 = None |
---|
| 115 | offset1 = None |
---|
| 116 | else: |
---|
| 117 | result_offsets.add_offset( offset1 ) |
---|
| 118 | try: |
---|
| 119 | index1, offset1 = offsets1.next() |
---|
| 120 | identifier1 = self.get_identifier_by_offset( offset1 ) |
---|
| 121 | except StopIteration: |
---|
| 122 | offset1 = None |
---|
| 123 | identifier1 = None |
---|
| 124 | index1 += 1 |
---|
| 125 | #methods to help link offsets to lines, ids, etc |
---|
| 126 | def get_identifier_by_line( self, line ): |
---|
| 127 | if isinstance( line, str ): |
---|
| 128 | fields = line.rstrip( '\r\n' ).split( self.split ) |
---|
| 129 | if self.column < len( fields ): |
---|
| 130 | return fields[self.column] |
---|
| 131 | return None |
---|
| 132 | def get_line_by_offset( self, offset ): |
---|
| 133 | self.indexed_file.seek( offset ) |
---|
| 134 | return self.indexed_file.readline() |
---|
| 135 | def get_identifier_by_offset( self, offset ): |
---|
| 136 | return self.get_identifier_by_line( self.get_line_by_offset( offset ) ) |
---|
| 137 | |
---|
| 138 | #indexed set of offsets, index is built on demand |
---|
| 139 | class OffsetIndex: |
---|
| 140 | def __init__( self, filename, column, split = None, index_depth = 3 ): |
---|
| 141 | self.filename = filename |
---|
| 142 | self.file = open( filename, 'rb' ) |
---|
| 143 | self.column = column |
---|
| 144 | self.split = split |
---|
| 145 | self._offsets = {} |
---|
| 146 | self._index = None |
---|
| 147 | self.index_depth = index_depth |
---|
| 148 | def _build_index( self ): |
---|
| 149 | self._index = {} |
---|
| 150 | for start_char, sorted_offsets in self._offsets.items(): |
---|
| 151 | self._index[start_char]={} |
---|
| 152 | for i, offset in enumerate( sorted_offsets.get_offsets() ): |
---|
| 153 | identifier = sorted_offsets.get_identifier_by_offset( offset ) |
---|
| 154 | if identifier[0:self.index_depth] not in self._index[start_char]: |
---|
| 155 | self._index[start_char][identifier[0:self.index_depth]] = i |
---|
| 156 | def get_lines_by_identifier( self, identifier ): |
---|
| 157 | if not identifier: return |
---|
| 158 | #if index doesn't exist, build it |
---|
| 159 | if self._index is None: self._build_index() |
---|
| 160 | |
---|
| 161 | #identifier cannot exist |
---|
| 162 | if identifier[0] not in self._index or identifier[0:self.index_depth] not in self._index[identifier[0]]: |
---|
| 163 | return |
---|
| 164 | #identifier might exist, search for it |
---|
| 165 | offset_index = self._index[identifier[0]][identifier[0:self.index_depth]] |
---|
| 166 | while True: |
---|
| 167 | if offset_index >= self._offsets[identifier[0]].size: |
---|
| 168 | return |
---|
| 169 | offset = self._offsets[identifier[0]].get_offset_by_index( offset_index ) |
---|
| 170 | identifier2 = self._offsets[identifier[0]].get_identifier_by_offset( offset ) |
---|
| 171 | if not identifier2 or identifier2 > identifier: |
---|
| 172 | return |
---|
| 173 | if identifier2 == identifier: |
---|
| 174 | yield self._offsets[identifier[0]].get_line_by_offset( offset ) |
---|
| 175 | offset_index += 1 |
---|
| 176 | def get_offsets( self ): |
---|
| 177 | keys = self._offsets.keys() |
---|
| 178 | keys.sort() |
---|
| 179 | for key in keys: |
---|
| 180 | for offset in self._offsets[key].get_offsets(): |
---|
| 181 | yield offset |
---|
| 182 | def get_line_by_offset( self, offset ): |
---|
| 183 | self.file.seek( offset ) |
---|
| 184 | return self.file.readline() |
---|
| 185 | def get_identifiers_offsets( self ): |
---|
| 186 | keys = self._offsets.keys() |
---|
| 187 | keys.sort() |
---|
| 188 | for key in keys: |
---|
| 189 | for offset in self._offsets[key].get_offsets(): |
---|
| 190 | yield self._offsets[key].get_identifier_by_offset( offset ), offset |
---|
| 191 | def get_identifier_by_line( self, line ): |
---|
| 192 | if isinstance( line, str ): |
---|
| 193 | fields = line.rstrip( '\r\n' ).split( self.split ) |
---|
| 194 | if self.column < len( fields ): |
---|
| 195 | return fields[self.column] |
---|
| 196 | return None |
---|
| 197 | def merge_with_dict( self, d ): |
---|
| 198 | if not d: return #no data to merge |
---|
| 199 | self._index = None |
---|
| 200 | keys = d.keys() |
---|
| 201 | keys.sort() |
---|
| 202 | identifier = keys.pop( 0 ) |
---|
| 203 | first_char = identifier[0] |
---|
| 204 | temp = { identifier: d[identifier] } |
---|
| 205 | while True: |
---|
| 206 | if not keys: |
---|
| 207 | if first_char not in self._offsets: |
---|
| 208 | self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) |
---|
| 209 | self._offsets[first_char].merge_with_dict( temp ) |
---|
| 210 | return |
---|
| 211 | identifier = keys.pop( 0 ) |
---|
| 212 | if identifier[0] == first_char: |
---|
| 213 | temp[identifier] = d[identifier] |
---|
| 214 | else: |
---|
| 215 | if first_char not in self._offsets: |
---|
| 216 | self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) |
---|
| 217 | self._offsets[first_char].merge_with_dict( temp ) |
---|
| 218 | temp = { identifier: d[identifier] } |
---|
| 219 | first_char = identifier[0] |
---|
| 220 | |
---|
| 221 | class BufferedIndex: |
---|
| 222 | def __init__( self, filename, column, split = None, buffer = 1000000, index_depth = 3 ): |
---|
| 223 | self.index = OffsetIndex( filename, column, split, index_depth ) |
---|
| 224 | self.buffered_offsets = {} |
---|
| 225 | f = open( filename, 'rb' ) |
---|
| 226 | offset = f.tell() |
---|
| 227 | identified_offset_count = 1 |
---|
| 228 | while True: |
---|
| 229 | offset = f.tell() |
---|
| 230 | line = f.readline() |
---|
| 231 | if not line: break #EOF |
---|
| 232 | identifier = self.index.get_identifier_by_line( line ) |
---|
| 233 | if identifier: |
---|
| 234 | #flush buffered offsets, if buffer size reached |
---|
| 235 | if buffer and identified_offset_count % buffer == 0: |
---|
| 236 | self.index.merge_with_dict( self.buffered_offsets ) |
---|
| 237 | self.buffered_offsets = {} |
---|
| 238 | if identifier not in self.buffered_offsets: |
---|
| 239 | self.buffered_offsets[identifier] = [] |
---|
| 240 | self.buffered_offsets[identifier].append( offset ) |
---|
| 241 | identified_offset_count += 1 |
---|
| 242 | f.close() |
---|
| 243 | |
---|
| 244 | def get_lines_by_identifier( self, identifier ): |
---|
| 245 | for line in self.index.get_lines_by_identifier( identifier ): |
---|
| 246 | yield line |
---|
| 247 | if identifier in self.buffered_offsets: |
---|
| 248 | for offset in self.buffered_offsets[identifier]: |
---|
| 249 | yield self.index.get_line_by_offset( offset ) |
---|
| 250 | |
---|
| 251 | |
---|
| 252 | def fill_empty_columns( line, split, fill_values ): |
---|
| 253 | if not fill_values: |
---|
| 254 | return line |
---|
| 255 | filled_columns = [] |
---|
| 256 | for i, field in enumerate( line.split( split ) ): |
---|
| 257 | if field or i >= len( fill_values ): |
---|
| 258 | filled_columns.append( field ) |
---|
| 259 | else: |
---|
| 260 | filled_columns.append( fill_values[i] ) |
---|
| 261 | if len( fill_values ) > len( filled_columns ): |
---|
| 262 | filled_columns.extend( fill_values[ len( filled_columns ) : ] ) |
---|
| 263 | return split.join( filled_columns ) |
---|
| 264 | |
---|
| 265 | |
---|
| 266 | def join_files( filename1, column1, filename2, column2, out_filename, split = None, buffer = 1000000, keep_unmatched = False, keep_partial = False, index_depth = 3, fill_options = None ): |
---|
| 267 | #return identifier based upon line |
---|
| 268 | def get_identifier_by_line( line, column, split = None ): |
---|
| 269 | if isinstance( line, str ): |
---|
| 270 | fields = line.rstrip( '\r\n' ).split( split ) |
---|
| 271 | if column < len( fields ): |
---|
| 272 | return fields[column] |
---|
| 273 | return None |
---|
| 274 | if fill_options is None: |
---|
| 275 | fill_options = Bunch( fill_unjoined_only = True, file1_columns = None, file2_columns = None ) |
---|
| 276 | out = open( out_filename, 'w+b' ) |
---|
| 277 | index = BufferedIndex( filename2, column2, split, buffer, index_depth ) |
---|
| 278 | for line1 in open( filename1, 'rb' ): |
---|
| 279 | identifier = get_identifier_by_line( line1, column1, split ) |
---|
| 280 | if identifier: |
---|
| 281 | written = False |
---|
| 282 | for line2 in index.get_lines_by_identifier( identifier ): |
---|
| 283 | if not fill_options.fill_unjoined_only: |
---|
| 284 | out.write( "%s%s%s\n" % ( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ), split, fill_empty_columns( line2.rstrip( '\r\n' ), split, fill_options.file2_columns ) ) ) |
---|
| 285 | else: |
---|
| 286 | out.write( "%s%s%s\n" % ( line1.rstrip( '\r\n' ), split, line2.rstrip( '\r\n' ) ) ) |
---|
| 287 | written = True |
---|
| 288 | if not written and keep_unmatched: |
---|
| 289 | out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) |
---|
| 290 | if fill_options: |
---|
| 291 | if fill_options.file2_columns: |
---|
| 292 | out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) |
---|
| 293 | out.write( "\n" ) |
---|
| 294 | elif keep_partial: |
---|
| 295 | out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) |
---|
| 296 | if fill_options: |
---|
| 297 | if fill_options.file2_columns: |
---|
| 298 | out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) |
---|
| 299 | out.write( "\n" ) |
---|
| 300 | out.close() |
---|
| 301 | |
---|
| 302 | def main(): |
---|
| 303 | parser = optparse.OptionParser() |
---|
| 304 | parser.add_option( |
---|
| 305 | '-b','--buffer', |
---|
| 306 | dest='buffer', |
---|
| 307 | type='int',default=1000000, |
---|
| 308 | help='Number of lines to buffer at a time. Default: 1,000,000 lines. A buffer of 0 will attempt to use memory only.' |
---|
| 309 | ) |
---|
| 310 | parser.add_option( |
---|
| 311 | '-d','--index_depth', |
---|
| 312 | dest='index_depth', |
---|
| 313 | type='int',default=3, |
---|
| 314 | help='Depth to use on filebased offset indexing. Default: 3.' |
---|
| 315 | ) |
---|
| 316 | parser.add_option( |
---|
| 317 | '-p','--keep_partial', |
---|
| 318 | action='store_true', |
---|
| 319 | dest='keep_partial', |
---|
| 320 | default=False, |
---|
| 321 | help='Keep rows in first input which are missing identifiers.') |
---|
| 322 | parser.add_option( |
---|
| 323 | '-u','--keep_unmatched', |
---|
| 324 | action='store_true', |
---|
| 325 | dest='keep_unmatched', |
---|
| 326 | default=False, |
---|
| 327 | help='Keep rows in first input which are not joined with the second input.') |
---|
| 328 | parser.add_option( |
---|
| 329 | '-f','--fill_options_file', |
---|
| 330 | dest='fill_options_file', |
---|
| 331 | type='str',default=None, |
---|
| 332 | help='Fill empty columns with a values from a JSONified file.') |
---|
| 333 | |
---|
| 334 | |
---|
| 335 | options, args = parser.parse_args() |
---|
| 336 | |
---|
| 337 | fill_options = None |
---|
| 338 | if options.fill_options_file is not None: |
---|
| 339 | try: |
---|
| 340 | if simplejson is None: |
---|
| 341 | raise simplejson_exception |
---|
| 342 | fill_options = Bunch( **stringify_dictionary_keys( simplejson.load( open( options.fill_options_file ) ) ) ) #simplejson.load( open( options.fill_options_file ) ) |
---|
| 343 | except Exception, e: |
---|
| 344 | print "Warning: Ignoring fill options due to simplejson error (%s)." % e |
---|
| 345 | if fill_options is None: |
---|
| 346 | fill_options = Bunch() |
---|
| 347 | if 'fill_unjoined_only' not in fill_options: |
---|
| 348 | fill_options.fill_unjoined_only = True |
---|
| 349 | if 'file1_columns' not in fill_options: |
---|
| 350 | fill_options.file1_columns = None |
---|
| 351 | if 'file2_columns' not in fill_options: |
---|
| 352 | fill_options.file2_columns = None |
---|
| 353 | |
---|
| 354 | |
---|
| 355 | try: |
---|
| 356 | filename1 = args[0] |
---|
| 357 | filename2 = args[1] |
---|
| 358 | column1 = int( args[2] ) - 1 |
---|
| 359 | column2 = int( args[3] ) - 1 |
---|
| 360 | out_filename = args[4] |
---|
| 361 | except: |
---|
| 362 | print >> sys.stderr, "Error parsing command line." |
---|
| 363 | sys.exit() |
---|
| 364 | |
---|
| 365 | #Character for splitting fields and joining lines |
---|
| 366 | split = "\t" |
---|
| 367 | |
---|
| 368 | return join_files( filename1, column1, filename2, column2, out_filename, split, options.buffer, options.keep_unmatched, options.keep_partial, options.index_depth, fill_options = fill_options ) |
---|
| 369 | |
---|
| 370 | if __name__ == "__main__": main() |
---|