root/galaxy-central/tools/filters/join.py @ 2

リビジョン 2, 15.0 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1#!/usr/bin/env python
2#Dan Blankenberg
3"""
4Script to Join Two Files on specified columns.
5
6Takes two tab delimited files, two column numbers (base 1) and outputs a new tab delimited file with lines joined by tabs.
7User can also opt to have have non-joining rows of file1 echoed.
8
9"""
10
11import optparse, os, sys, tempfile, struct
12import psyco_full
13
14try:
15    simple_json_exception = None
16    from galaxy import eggs
17    from galaxy.util.bunch import Bunch
18    from galaxy.util import stringify_dictionary_keys
19    import pkg_resources
20    pkg_resources.require("simplejson")
21    import simplejson
22except Exception, e:
23    simplejson_exception = e
24    simplejson = None
25
26
27class OffsetList:
28    def __init__( self, filesize = 0, fmt = None ):
29        self.file = tempfile.NamedTemporaryFile( 'w+b' )
30        if fmt:
31            self.fmt = fmt
32        elif filesize and filesize <= sys.maxint * 2:
33            self.fmt = 'I'
34        else:
35            self.fmt = 'Q'
36        self.fmt_size = struct.calcsize( self.fmt )
37    @property
38    def size( self ):
39        self.file.flush()
40        return self.file_size / self.fmt_size
41    @property
42    def file_size( self ):
43        self.file.flush()
44        return os.stat( self.file.name ).st_size
45    def add_offset( self, offset ):
46        if not isinstance( offset, list ):
47            offset = [offset]
48        self.file.seek( self.file_size )
49        for off in offset:
50            self.file.write( struct.pack( self.fmt, off ) )
51    def get_offsets( self, start = 0 ):
52        self.file.seek( start * self.fmt_size )
53        while True:
54            packed = self.file.read( self.fmt_size )
55            if not packed: break
56            yield struct.unpack( self.fmt, packed )[0]
57    def get_offset_by_index( self, index ):
58        self.file.seek( index * self.fmt_size )
59        return struct.unpack( self.fmt, self.file.read( self.fmt_size ) )[0]
60    def set_offset_at_index( self, index, offset ):
61        if not isinstance( offset, list ):
62            offset = [offset]
63        if index >= self.size:
64            self.add_offset( offset )
65        else:
66            temp_file = tempfile.NamedTemporaryFile( 'w+b' )
67            self.file.seek( 0 )
68            temp_file.write( self.file.read( ( index ) * self.fmt_size ) )
69            for off in offset:
70                temp_file.write( struct.pack( self.fmt, off ) )
71            temp_file.write( self.file.read() )
72            self.file = temp_file
73
74class SortedOffsets( OffsetList ):
75    def __init__( self, indexed_filename, column, split = None ):
76        OffsetList.__init__( self, os.stat( indexed_filename ).st_size )
77        self.indexed_filename = indexed_filename
78        self.indexed_file = open( indexed_filename, 'rb' )
79        self.column = column
80        self.split = split
81        self.last_identifier = None
82        self.last_identifier_merged = None
83        self.last_offset_merged = 0
84    def merge_with_dict( self, new_offset_dict ):
85        if not new_offset_dict: return #no items to merge in
86        keys = new_offset_dict.keys()
87        keys.sort()
88        identifier2 = keys.pop( 0 )
89       
90        result_offsets = OffsetList( fmt = self.fmt )
91        offsets1 = enumerate( self.get_offsets() )
92        try:
93            index1, offset1 = offsets1.next()
94            identifier1 = self.get_identifier_by_offset( offset1 )
95        except StopIteration:
96            offset1 = None
97            identifier1 = None
98            index1 = 0
99       
100        while True:
101            if identifier1 is None and identifier2 is None:
102                self.file = result_offsets.file #self is now merged results
103                return
104            elif identifier1 is None or ( identifier2 and identifier2 < identifier1 ):
105                result_offsets.add_offset( new_offset_dict[identifier2] )
106                if keys:
107                    identifier2 = keys.pop( 0 )
108                else:
109                    identifier2 = None
110            elif identifier2 is None:
111                result_offsets.file.seek( result_offsets.file_size )
112                self.file.seek( index1 * self.fmt_size )
113                result_offsets.file.write( self.file.read() )
114                identifier1 = None
115                offset1 = None
116            else:
117                result_offsets.add_offset( offset1 )
118                try:
119                    index1, offset1 = offsets1.next()
120                    identifier1 = self.get_identifier_by_offset( offset1 )
121                except StopIteration:
122                    offset1 = None
123                    identifier1 = None
124                    index1 += 1
125#methods to help link offsets to lines, ids, etc
126    def get_identifier_by_line( self, line ):
127        if isinstance( line, str ):
128            fields = line.rstrip( '\r\n' ).split( self.split )
129            if self.column < len( fields ):
130                return fields[self.column]
131        return None
132    def get_line_by_offset( self, offset ):
133        self.indexed_file.seek( offset )
134        return self.indexed_file.readline()
135    def get_identifier_by_offset( self, offset ):
136        return self.get_identifier_by_line( self.get_line_by_offset( offset ) )
137
138#indexed set of offsets, index is built on demand
139class OffsetIndex:
140    def __init__( self, filename, column, split = None, index_depth = 3 ):
141        self.filename = filename
142        self.file = open( filename, 'rb' )
143        self.column = column
144        self.split = split
145        self._offsets = {}
146        self._index = None
147        self.index_depth = index_depth
148    def _build_index( self ):
149        self._index = {}
150        for start_char, sorted_offsets in self._offsets.items():
151            self._index[start_char]={}
152            for i, offset in enumerate( sorted_offsets.get_offsets() ):
153                identifier = sorted_offsets.get_identifier_by_offset( offset )
154                if identifier[0:self.index_depth] not in self._index[start_char]:
155                    self._index[start_char][identifier[0:self.index_depth]] = i
156    def get_lines_by_identifier( self, identifier ):
157        if not identifier: return
158        #if index doesn't exist, build it
159        if self._index is None: self._build_index()
160       
161        #identifier cannot exist
162        if identifier[0] not in self._index or identifier[0:self.index_depth] not in self._index[identifier[0]]:
163            return
164        #identifier might exist, search for it
165        offset_index = self._index[identifier[0]][identifier[0:self.index_depth]]
166        while True:
167            if offset_index >= self._offsets[identifier[0]].size:
168                return
169            offset = self._offsets[identifier[0]].get_offset_by_index( offset_index )
170            identifier2 = self._offsets[identifier[0]].get_identifier_by_offset( offset )
171            if not identifier2 or identifier2 > identifier:
172                return
173            if identifier2 == identifier:
174                yield self._offsets[identifier[0]].get_line_by_offset( offset )
175            offset_index += 1
176    def get_offsets( self ):
177        keys = self._offsets.keys()
178        keys.sort()
179        for key in keys:
180            for offset in self._offsets[key].get_offsets():
181                yield offset
182    def get_line_by_offset( self, offset ):
183        self.file.seek( offset )
184        return self.file.readline()
185    def get_identifiers_offsets( self ):
186        keys = self._offsets.keys()
187        keys.sort()
188        for key in keys:
189            for offset in self._offsets[key].get_offsets():
190                yield self._offsets[key].get_identifier_by_offset( offset ), offset
191    def get_identifier_by_line( self, line ):
192        if isinstance( line, str ):
193            fields = line.rstrip( '\r\n' ).split( self.split )
194            if self.column < len( fields ):
195                return fields[self.column]
196        return None
197    def merge_with_dict( self, d ):
198        if not d: return #no data to merge
199        self._index = None
200        keys = d.keys()
201        keys.sort()
202        identifier = keys.pop( 0 )
203        first_char = identifier[0]
204        temp = { identifier: d[identifier] }
205        while True:
206            if not keys:
207                if first_char not in self._offsets:
208                    self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split )
209                self._offsets[first_char].merge_with_dict( temp )
210                return
211            identifier = keys.pop( 0 )
212            if identifier[0] == first_char:
213                temp[identifier] = d[identifier]
214            else:
215                if first_char not in self._offsets:
216                    self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split )
217                self._offsets[first_char].merge_with_dict( temp )
218                temp = { identifier: d[identifier] }
219                first_char = identifier[0]
220
221class BufferedIndex:
222    def __init__( self, filename, column, split = None, buffer = 1000000, index_depth = 3 ):
223        self.index = OffsetIndex( filename, column, split, index_depth )
224        self.buffered_offsets = {}
225        f = open( filename, 'rb' )
226        offset = f.tell()
227        identified_offset_count = 1
228        while True:
229            offset = f.tell()
230            line = f.readline()
231            if not line: break #EOF
232            identifier = self.index.get_identifier_by_line( line )
233            if identifier:
234                #flush buffered offsets, if buffer size reached
235                if buffer and identified_offset_count % buffer == 0:
236                    self.index.merge_with_dict( self.buffered_offsets )
237                    self.buffered_offsets = {}
238                if identifier not in self.buffered_offsets:
239                    self.buffered_offsets[identifier] = []
240                self.buffered_offsets[identifier].append( offset )
241                identified_offset_count += 1
242        f.close()
243     
244    def get_lines_by_identifier( self, identifier ):
245        for line in self.index.get_lines_by_identifier( identifier ):
246            yield line
247        if identifier in self.buffered_offsets:
248            for offset in self.buffered_offsets[identifier]:
249                yield self.index.get_line_by_offset( offset )
250
251
252def fill_empty_columns( line, split, fill_values ):
253    if not fill_values:
254        return line
255    filled_columns = []
256    for i, field in enumerate( line.split( split ) ):
257        if field or i >= len( fill_values ):
258            filled_columns.append( field )
259        else:
260            filled_columns.append( fill_values[i] )
261    if len( fill_values ) > len( filled_columns ):
262        filled_columns.extend( fill_values[ len( filled_columns ) : ] )
263    return split.join( filled_columns )
264
265
266def join_files( filename1, column1, filename2, column2, out_filename, split = None, buffer = 1000000, keep_unmatched = False, keep_partial = False, index_depth = 3, fill_options = None ):
267    #return identifier based upon line
268    def get_identifier_by_line( line, column, split = None ):
269        if isinstance( line, str ):
270            fields = line.rstrip( '\r\n' ).split( split )
271            if column < len( fields ):
272                return fields[column]
273        return None
274    if fill_options is None:
275        fill_options = Bunch( fill_unjoined_only = True, file1_columns = None, file2_columns = None )
276    out = open( out_filename, 'w+b' )
277    index = BufferedIndex( filename2, column2, split, buffer, index_depth )
278    for line1 in open( filename1, 'rb' ):
279        identifier = get_identifier_by_line( line1, column1, split )
280        if identifier:
281            written = False
282            for line2 in index.get_lines_by_identifier( identifier ):
283                if not fill_options.fill_unjoined_only:
284                    out.write( "%s%s%s\n" % ( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ), split, fill_empty_columns( line2.rstrip( '\r\n' ), split, fill_options.file2_columns ) ) )
285                else:
286                    out.write( "%s%s%s\n" % ( line1.rstrip( '\r\n' ), split, line2.rstrip( '\r\n' ) ) )
287                written = True
288            if not written and keep_unmatched:
289                out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) )
290                if fill_options:
291                    if fill_options.file2_columns:
292                        out.write( "%s%s" % ( split,  fill_empty_columns( "", split, fill_options.file2_columns ) ) )
293                out.write( "\n" )
294        elif keep_partial:
295            out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) )
296            if fill_options:
297                if fill_options.file2_columns:
298                    out.write( "%s%s" % ( split,  fill_empty_columns( "", split, fill_options.file2_columns ) ) )
299            out.write( "\n" )
300    out.close()
301
302def main():
303    parser = optparse.OptionParser()
304    parser.add_option(
305        '-b','--buffer',
306        dest='buffer',
307        type='int',default=1000000,
308        help='Number of lines to buffer at a time. Default: 1,000,000 lines. A buffer of 0 will attempt to use memory only.'
309    )
310    parser.add_option(
311        '-d','--index_depth',
312        dest='index_depth',
313        type='int',default=3,
314        help='Depth to use on filebased offset indexing. Default: 3.'
315    )
316    parser.add_option(
317        '-p','--keep_partial',
318        action='store_true',
319        dest='keep_partial',
320        default=False,
321        help='Keep rows in first input which are missing identifiers.')
322    parser.add_option(
323        '-u','--keep_unmatched',
324        action='store_true',
325        dest='keep_unmatched',
326        default=False,
327        help='Keep rows in first input which are not joined with the second input.')
328    parser.add_option(
329        '-f','--fill_options_file',
330        dest='fill_options_file',
331        type='str',default=None,
332        help='Fill empty columns with a values from a JSONified file.')
333   
334   
335    options, args = parser.parse_args()
336   
337    fill_options = None
338    if options.fill_options_file is not None:
339        try:
340            if simplejson is None:
341                raise simplejson_exception
342            fill_options = Bunch( **stringify_dictionary_keys( simplejson.load( open( options.fill_options_file ) ) ) ) #simplejson.load( open( options.fill_options_file ) )
343        except Exception, e:
344            print "Warning: Ignoring fill options due to simplejson error (%s)." % e
345    if fill_options is None:
346        fill_options = Bunch()
347    if 'fill_unjoined_only' not in fill_options:
348        fill_options.fill_unjoined_only = True
349    if 'file1_columns' not in fill_options:
350        fill_options.file1_columns = None
351    if 'file2_columns' not in fill_options:
352        fill_options.file2_columns = None
353   
354   
355    try:
356        filename1 = args[0]
357        filename2 = args[1]
358        column1 = int( args[2] ) - 1
359        column2 = int( args[3] ) - 1
360        out_filename = args[4]
361    except:
362        print >> sys.stderr, "Error parsing command line."
363        sys.exit()
364   
365    #Character for splitting fields and joining lines
366    split = "\t"
367   
368    return join_files( filename1, column1, filename2, column2, out_filename, split, options.buffer, options.keep_unmatched, options.keep_partial, options.index_depth, fill_options = fill_options )
369
370if __name__ == "__main__": main()
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。