root/galaxy-central/eggs/bx_python-0.5.0_dev_f74aec067563-py2.6-macosx-10.6-universal-ucs2.egg/bx/seq/twobit.py @ 3

リビジョン 3, 4.1 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1"""
2Access to files containing sequence data in 'twobit' format.
3"""
4
5import sys
6import _twobit
7
8from struct import *
9from UserDict import DictMixin
10
11TWOBIT_MAGIC_NUMBER = 0x1A412743
12TWOBIT_MAGIC_NUMBER_SWAP = 0x4327411A
13TWOBIT_MAGIC_SIZE = 4
14
15TWOBIT_VERSION = 0
16
17class TwoBitSequence( object ):
18    def __init__( self, tbf, header_offset=None ):
19        self.tbf = tbf
20        self.header_offset = header_offset
21        self.sequence_offset = None
22        self.size = None
23        self.n_blocks = None
24        self.masked_blocks = None
25        self.loaded = False
26       
27    def __getitem__( self, slice ):
28        start, stop, stride = slice.indices( self.size )
29        assert stride == 1, "Striding in slices not supported"
30        if stop - start < 1:
31            return ""
32        return _twobit.read( self.tbf.file, self, start, stop, self.tbf.do_mask  )
33       
34    def __len__( self ):
35        return self.size
36       
37    def get( self, start, end ):
38        # Trim start / stop
39        if start < 0:
40            start = 0
41        if end > seq.size:
42            end = seq.size
43        out_size = end - start
44        if out_size < 1:
45            raise Exception( "end before start (%s,%s)" % ( start,end ) )
46        # Find position of packed portion
47        dna = _twobit.read( self.tbf.file, self, start, end, self.tbf.do_mask )
48        # Return
49        return dna
50       
51class TwoBitFile( DictMixin ):
52    def __init__( self, file, do_mask=True ):
53        self.do_mask = do_mask
54        # Read magic and determine byte order
55        self.byte_order = ">"
56        magic = unpack( ">L", file.read( TWOBIT_MAGIC_SIZE ) )[0]
57        if magic != TWOBIT_MAGIC_NUMBER:
58            if magic == TWOBIT_MAGIC_NUMBER_SWAP:
59                self.byte_order = "<"
60            else:
61                raise Exception( "Not a NIB file" )
62        self.magic = magic
63        self.file = file
64        # Read version
65        self.version = self.read( "L" )
66        if self.version != TWOBIT_VERSION:
67            raise Exception( "File is version '%d' but I only know about '%d'" % ( self.version, TWOBIT_VERSION ) )
68        # Number of sequences in file
69        self.seq_count = self.read( "L" )
70        # Header contains some reserved space
71        self.reserved = self.read( "L" )
72        # Read index of sequence names to offsets
73        index = dict()
74        for i in range( self.seq_count ):
75            name = self.read_p_string()
76            offset = self.read( "L" )
77            index[name] = TwoBitSequence( self, offset )
78        self.index = index
79
80    def __getitem__( self, name ):
81        seq = self.index[name]
82        if not seq.loaded:
83            self.load_sequence( name )
84        return seq
85       
86    def keys( self ):
87        return self.index.keys()
88       
89    def load_sequence( self, name ):
90        seq = self.index[name]
91        # Seek to start of sequence block
92        self.file.seek( seq.header_offset )
93        # Size of sequence
94        seq.size = self.read( "L" )
95        # Read N and masked block regions
96        seq.n_block_starts, seq.n_block_sizes = self.read_block_coords()
97        seq.masked_block_starts, seq.masked_block_sizes = self.read_block_coords()
98        # Reserved
99        self.read( "L" )
100        # Save start of actualt sequence
101        seq.sequence_offset = self.file.tell()
102        # Mark as loaded
103        seq.loaded = True
104       
105    def read_block_coords( self ):
106        blocks = []
107        block_count = self.read( "L" )
108        if block_count == 0:
109            return [], []
110        starts = self.read( str( block_count ) + "L", untuple=False )
111        sizes = self.read( str( block_count ) + "L", untuple=False  )
112        return list( starts ), list( sizes )
113       
114    def read( self, pattern, untuple=True ):
115        rval = unpack( self.byte_order + pattern,
116                       self.file.read( calcsize( self.byte_order + pattern ) ) )
117        if untuple and len( rval ) == 1:
118            return rval[0]
119        return rval
120       
121    def read_p_string( self ):
122        """
123        Read a length-prefixed string
124        """
125        length = self.read( "B" )
126        return self.file.read( length )
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。