root/galaxy-central/lib/galaxy_utils/sequence/fasta.py

リビジョン 2, 4.3 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1#Dan Blankenberg
2
3class fastaSequence( object ):
4    def __init__( self ):
5        self.identifier = None
6        self.sequence = '' #holds raw sequence string: no whitespace
7    def __len__( self ):
8        return len( self.sequence )
9    def __str__( self ):
10        return "%s\n%s\n" % ( self.identifier, self.sequence )
11
12class fastaReader( object ):
13    def __init__( self, fh ):
14        self.file = fh
15    def close( self ):
16        return self.file.close()
17    def next( self ):
18        line = self.file.readline()
19        #remove header comment lines
20        while line and line.startswith( '#' ):
21            line = self.file.readline()
22        if not line:
23            raise StopIteration
24        assert line.startswith( '>' ), "FASTA headers must start with >"
25        rval = fastaSequence()
26        rval.identifier = line.strip()
27        offset = self.file.tell()
28        while True:
29            line = self.file.readline()
30            if not line or line.startswith( '>' ):
31                if line:
32                    self.file.seek( offset ) #this causes sequence id lines to be read twice, once to determine previous sequence end and again when getting actual sequence; can we cache this to prevent it from being re-read?
33                return rval
34            #454 qual test data that was used has decimal scores that don't have trailing spaces
35            #so we'll need to parse and build these sequences not based upon de facto standards
36            #i.e. in a less than ideal fashion
37            line = line.rstrip()
38            if ' ' in rval.sequence or ' ' in line:
39                rval.sequence = "%s%s " % ( rval.sequence, line )
40            else:
41                rval.sequence += line
42            offset = self.file.tell()
43    def __iter__( self ):
44        while True:
45            yield self.next()
46
47class fastaNamedReader( object ):
48    def __init__( self, fh ):
49        self.file = fh
50        self.reader = fastaReader( self.file )
51        self.offset_dict = {}
52        self.eof = False
53    def close( self ):
54        return self.file.close()
55    def get( self, sequence_id ):
56        if not isinstance( sequence_id, basestring ):
57            sequence_id = sequence_id.identifier
58        rval = None
59        if sequence_id in self.offset_dict:
60            initial_offset = self.file.tell()
61            seq_offset = self.offset_dict[ sequence_id ].pop( 0 )
62            if not self.offset_dict[ sequence_id ]:
63                del self.offset_dict[ sequence_id ]
64            self.file.seek( seq_offset )
65            rval = self.reader.next()
66            self.file.seek( initial_offset )
67        else:
68            while True:
69                offset = self.file.tell()
70                try:
71                    fasta_seq = self.reader.next()
72                except StopIteration:
73                    self.eof = True
74                    break #eof, id not found, will return None
75                if fasta_seq.identifier == sequence_id:
76                    rval = fasta_seq
77                    break
78                else:
79                    if fasta_seq.identifier not in self.offset_dict:
80                        self.offset_dict[ fasta_seq.identifier ] = []
81                    self.offset_dict[ fasta_seq.identifier ].append( offset )
82        return rval
83    def has_data( self ):
84        #returns a string representation of remaining data, or empty string (False) if no data remaining
85        eof = self.eof
86        count = 0
87        rval = ''
88        if self.offset_dict:
89            count = sum( map( len, self.offset_dict.values() ) )
90        if not eof:
91            offset = self.file.tell()
92            try:
93                fasta_seq = self.reader.next()
94            except StopIteration:
95                eof = True
96            self.file.seek( offset )
97        if count:
98            rval = "There were %i known sequences not utilized. " % count
99        if not eof:
100            rval = "%s%s" % ( rval, "An additional unknown number of sequences exist in the input that were not utilized." )
101        return rval
102
103class fastaWriter( object ):
104    def __init__( self, fh ):
105        self.file = fh
106    def write( self, fastq_read ):
107        #this will include color space adapter base if applicable
108        self.file.write( ">%s\n%s\n" % ( fastq_read.identifier[1:], fastq_read.sequence ) )
109    def close( self ):
110        return self.file.close()
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。