| 1 | """ |
|---|
| 2 | Access to files containing sequence data in 'twobit' format. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | import sys |
|---|
| 6 | import _twobit |
|---|
| 7 | |
|---|
| 8 | from struct import * |
|---|
| 9 | from UserDict import DictMixin |
|---|
| 10 | |
|---|
| 11 | TWOBIT_MAGIC_NUMBER = 0x1A412743 |
|---|
| 12 | TWOBIT_MAGIC_NUMBER_SWAP = 0x4327411A |
|---|
| 13 | TWOBIT_MAGIC_SIZE = 4 |
|---|
| 14 | |
|---|
| 15 | TWOBIT_VERSION = 0 |
|---|
| 16 | |
|---|
| 17 | class TwoBitSequence( object ): |
|---|
| 18 | def __init__( self, tbf, header_offset=None ): |
|---|
| 19 | self.tbf = tbf |
|---|
| 20 | self.header_offset = header_offset |
|---|
| 21 | self.sequence_offset = None |
|---|
| 22 | self.size = None |
|---|
| 23 | self.n_blocks = None |
|---|
| 24 | self.masked_blocks = None |
|---|
| 25 | self.loaded = False |
|---|
| 26 | |
|---|
| 27 | def __getitem__( self, slice ): |
|---|
| 28 | start, stop, stride = slice.indices( self.size ) |
|---|
| 29 | assert stride == 1, "Striding in slices not supported" |
|---|
| 30 | if stop - start < 1: |
|---|
| 31 | return "" |
|---|
| 32 | return _twobit.read( self.tbf.file, self, start, stop, self.tbf.do_mask ) |
|---|
| 33 | |
|---|
| 34 | def __len__( self ): |
|---|
| 35 | return self.size |
|---|
| 36 | |
|---|
| 37 | def get( self, start, end ): |
|---|
| 38 | # Trim start / stop |
|---|
| 39 | if start < 0: |
|---|
| 40 | start = 0 |
|---|
| 41 | if end > seq.size: |
|---|
| 42 | end = seq.size |
|---|
| 43 | out_size = end - start |
|---|
| 44 | if out_size < 1: |
|---|
| 45 | raise Exception( "end before start (%s,%s)" % ( start,end ) ) |
|---|
| 46 | # Find position of packed portion |
|---|
| 47 | dna = _twobit.read( self.tbf.file, self, start, end, self.tbf.do_mask ) |
|---|
| 48 | # Return |
|---|
| 49 | return dna |
|---|
| 50 | |
|---|
| 51 | class TwoBitFile( DictMixin ): |
|---|
| 52 | def __init__( self, file, do_mask=True ): |
|---|
| 53 | self.do_mask = do_mask |
|---|
| 54 | # Read magic and determine byte order |
|---|
| 55 | self.byte_order = ">" |
|---|
| 56 | magic = unpack( ">L", file.read( TWOBIT_MAGIC_SIZE ) )[0] |
|---|
| 57 | if magic != TWOBIT_MAGIC_NUMBER: |
|---|
| 58 | if magic == TWOBIT_MAGIC_NUMBER_SWAP: |
|---|
| 59 | self.byte_order = "<" |
|---|
| 60 | else: |
|---|
| 61 | raise Exception( "Not a NIB file" ) |
|---|
| 62 | self.magic = magic |
|---|
| 63 | self.file = file |
|---|
| 64 | # Read version |
|---|
| 65 | self.version = self.read( "L" ) |
|---|
| 66 | if self.version != TWOBIT_VERSION: |
|---|
| 67 | raise Exception( "File is version '%d' but I only know about '%d'" % ( self.version, TWOBIT_VERSION ) ) |
|---|
| 68 | # Number of sequences in file |
|---|
| 69 | self.seq_count = self.read( "L" ) |
|---|
| 70 | # Header contains some reserved space |
|---|
| 71 | self.reserved = self.read( "L" ) |
|---|
| 72 | # Read index of sequence names to offsets |
|---|
| 73 | index = dict() |
|---|
| 74 | for i in range( self.seq_count ): |
|---|
| 75 | name = self.read_p_string() |
|---|
| 76 | offset = self.read( "L" ) |
|---|
| 77 | index[name] = TwoBitSequence( self, offset ) |
|---|
| 78 | self.index = index |
|---|
| 79 | |
|---|
| 80 | def __getitem__( self, name ): |
|---|
| 81 | seq = self.index[name] |
|---|
| 82 | if not seq.loaded: |
|---|
| 83 | self.load_sequence( name ) |
|---|
| 84 | return seq |
|---|
| 85 | |
|---|
| 86 | def keys( self ): |
|---|
| 87 | return self.index.keys() |
|---|
| 88 | |
|---|
| 89 | def load_sequence( self, name ): |
|---|
| 90 | seq = self.index[name] |
|---|
| 91 | # Seek to start of sequence block |
|---|
| 92 | self.file.seek( seq.header_offset ) |
|---|
| 93 | # Size of sequence |
|---|
| 94 | seq.size = self.read( "L" ) |
|---|
| 95 | # Read N and masked block regions |
|---|
| 96 | seq.n_block_starts, seq.n_block_sizes = self.read_block_coords() |
|---|
| 97 | seq.masked_block_starts, seq.masked_block_sizes = self.read_block_coords() |
|---|
| 98 | # Reserved |
|---|
| 99 | self.read( "L" ) |
|---|
| 100 | # Save start of actualt sequence |
|---|
| 101 | seq.sequence_offset = self.file.tell() |
|---|
| 102 | # Mark as loaded |
|---|
| 103 | seq.loaded = True |
|---|
| 104 | |
|---|
| 105 | def read_block_coords( self ): |
|---|
| 106 | blocks = [] |
|---|
| 107 | block_count = self.read( "L" ) |
|---|
| 108 | if block_count == 0: |
|---|
| 109 | return [], [] |
|---|
| 110 | starts = self.read( str( block_count ) + "L", untuple=False ) |
|---|
| 111 | sizes = self.read( str( block_count ) + "L", untuple=False ) |
|---|
| 112 | return list( starts ), list( sizes ) |
|---|
| 113 | |
|---|
| 114 | def read( self, pattern, untuple=True ): |
|---|
| 115 | rval = unpack( self.byte_order + pattern, |
|---|
| 116 | self.file.read( calcsize( self.byte_order + pattern ) ) ) |
|---|
| 117 | if untuple and len( rval ) == 1: |
|---|
| 118 | return rval[0] |
|---|
| 119 | return rval |
|---|
| 120 | |
|---|
| 121 | def read_p_string( self ): |
|---|
| 122 | """ |
|---|
| 123 | Read a length-prefixed string |
|---|
| 124 | """ |
|---|
| 125 | length = self.read( "B" ) |
|---|
| 126 | return self.file.read( length ) |
|---|