root/galaxy-central/lib/galaxy_utils/sequence/vcf.py

リビジョン 2, 4.6 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1#Dan Blankenberg
2#See: http://1000genomes.org/wiki/doku.php?id=1000_genomes:analysis:vcf3.3
3#See: http://1000genomes.org/wiki/doku.php?id=1000_genomes:analysis:variant_call_format
4
5class VariantCall( object ):
6    version = None
7    header_startswith = None
8    required_header_fields = None
9    required_header_length = None
10   
11    @classmethod
12    def get_class_by_format( cls, format ):
13        assert format in VCF_FORMATS, 'Unknown format type specified: %s' % format
14        return VCF_FORMATS[ format ]
15   
16    def __init__( self, vcf_line, metadata, sample_names ):
17        raise Exception( 'Abstract Method' )
18
19class VariantCall33( VariantCall ):
20    version = 'VCFv3.3'
21    header_startswith = '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO'
22    required_header_fields = header_startswith.split( '\t' )
23    required_header_length = len( required_header_fields )
24   
25    def __init__( self, vcf_line, metadata, sample_names ):
26        # Raw line is needed for indexing file.
27        self.raw_line = vcf_line
28        self.line = vcf_line.rstrip( '\n\r' )
29        self.metadata = metadata
30        self.sample_names = sample_names
31        self.format = None
32        self.sample_values = []
33       
34        #parse line
35        self.fields = self.line.split( '\t' )
36        if sample_names:
37            assert len( self.fields ) == self.required_header_length + len( sample_names ) + 1, 'Provided VCF line (%s) has wrong length (expected: %i)' % ( self.line, self.required_header_length + len( sample_names ) + 1 )
38        else:
39            assert len( self.fields ) == self.required_header_length, 'Provided VCF line (%s) has wrong length (expected: %i)' % ( self.line, self.required_header_length)
40        self.chrom, self.pos, self.id, self.ref, self.alt, self.qual, self.filter, self.info = self.fields[ :self.required_header_length ]
41        self.pos = int( self.pos )
42        self.alt = self.alt.split( ',' )
43        self.qual = float( self.qual )
44        if len( self.fields ) > self.required_header_length:
45            self.format = self.fields[ self.required_header_length ].split( ':' )
46            for sample_value in self.fields[ self.required_header_length + 1: ]:
47                self.sample_values.append( sample_value.split( ':' ) )
48
49class VariantCall40( VariantCall33 ):
50    version = 'VCFv4.0'
51    def __init__( self, vcf_line, metadata, sample_names ):
52        VariantCall33.__init__( self, vcf_line, metadata, sample_names)
53
54#VCF Format version lookup dict
55VCF_FORMATS = {}
56for format in [ VariantCall33, VariantCall40 ]:
57    VCF_FORMATS[format.version] = format
58
59class Reader( object ):
60    def __init__( self, fh ):
61        self.vcf_file = fh
62        self.metadata = {}
63        self.header_fields = None
64        self.metadata_len = 0
65        self.sample_names = []
66        self.vcf_class = None
67       
68        # Read file metadata.
69        while True:
70            line = self.vcf_file.readline()
71            self.metadata_len += len( line )
72            assert line, 'Invalid VCF file provided.'
73            line = line.rstrip( '\r\n' )
74            if self.vcf_class and line.startswith( self.vcf_class.header_startswith ):
75                # read the header fields, ignoring any blank tabs, which GATK
76                # VCF produces after the sample
77                self.header_fields = [l for l in line.split( '\t' ) if l]
78                if len( self.header_fields ) > self.vcf_class.required_header_length:
79                    for sample_name in self.header_fields[ self.vcf_class.required_header_length + 1 : ]:
80                        self.sample_names.append( sample_name )
81                break
82            assert line.startswith( '##' ), 'Non-metadata line found before header'
83            line = line[2:] #strip ##
84            metadata = line.split( '=', 1 )
85            metadata_name = metadata[ 0 ]
86            if len( metadata ) == 2:
87                metadata_value = metadata[ 1 ]
88            else:
89                metadata_value = None
90            if metadata_name in self.metadata:
91                if not isinstance( self.metadata[ metadata_name ], list ):
92                    self.metadata[ metadata_name ] = [ self.metadata[ metadata_name ] ]
93                self.metadata[ metadata_name ].append( metadata_value )
94            else:
95                self.metadata[ metadata_name ] = metadata_value
96            if metadata_name == 'fileformat':
97                self.vcf_class = VariantCall.get_class_by_format( metadata_value )
98    def next( self ):
99        line = self.vcf_file.readline()
100        if not line:
101            raise StopIteration
102        return self.vcf_class( line, self.metadata, self.sample_names )
103    def __iter__( self ):
104        while True:
105            yield self.next()
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。