1 | """ |
---|
2 | Tools for fusing contiguous alignment blocks together. |
---|
3 | """ |
---|
4 | |
---|
5 | from itertools import * |
---|
6 | from copy import deepcopy |
---|
7 | |
---|
8 | def fuse_list( mafs ): |
---|
9 | """ |
---|
10 | Try to fuse a list of blocks by progressively fusing each adjacent pair. |
---|
11 | """ |
---|
12 | last = None |
---|
13 | for m in mafs: |
---|
14 | if last is None: |
---|
15 | last = m |
---|
16 | else: |
---|
17 | fused = fuse( last, m ) |
---|
18 | if fused: |
---|
19 | last = fused |
---|
20 | else: |
---|
21 | yield last |
---|
22 | last = m |
---|
23 | if last: |
---|
24 | yield last |
---|
25 | |
---|
26 | def fuse( m1, m2 ): |
---|
27 | """ |
---|
28 | Attempt to fuse two blocks. If they can be fused returns a new block, |
---|
29 | otherwise returns None. |
---|
30 | |
---|
31 | Example: |
---|
32 | |
---|
33 | >>> import bx.align.maf |
---|
34 | |
---|
35 | >>> block1 = bx.align.maf.from_string( ''' |
---|
36 | ... a score=0.0 |
---|
37 | ... s hg18.chr10 52686 44 + 135374737 GTGCTAACTTACTGCTCCACAGAAAACATCAATTCTGCTCATGC |
---|
38 | ... s panTro1.chrUn_random 208115356 44 - 240967748 GTGCTAACTGACTGCTCCAGAGAAAACATCAATTCTGTTCATGT |
---|
39 | ... ''' ) |
---|
40 | |
---|
41 | >>> block2 = bx.align.maf.from_string( ''' |
---|
42 | ... a score=0.0 |
---|
43 | ... s hg18.chr10 52730 69 + 135374737 GCAGGTACAATTCATCAAGAAAGGAATTACAACTTCAGAAATGTGTTCAAAATATATCCATACTTTGAC |
---|
44 | ... s panTro1.chrUn_random 208115400 69 - 240967748 GCAGCTACTATTCATCAAGAAAGGGATTACAACTTCAGAAATGTGTTCAAAGTGTATCCATACTTTGAT |
---|
45 | ... ''' ) |
---|
46 | |
---|
47 | >>> fused = fuse( block1, block2 ) |
---|
48 | |
---|
49 | >>> print fused |
---|
50 | a score=0.0 |
---|
51 | s hg18.chr10 52686 113 + 135374737 GTGCTAACTTACTGCTCCACAGAAAACATCAATTCTGCTCATGCGCAGGTACAATTCATCAAGAAAGGAATTACAACTTCAGAAATGTGTTCAAAATATATCCATACTTTGAC |
---|
52 | s panTro1.chrUn_random 208115356 113 - 240967748 GTGCTAACTGACTGCTCCAGAGAAAACATCAATTCTGTTCATGTGCAGCTACTATTCATCAAGAAAGGGATTACAACTTCAGAAATGTGTTCAAAGTGTATCCATACTTTGAT |
---|
53 | <BLANKLINE> |
---|
54 | """ |
---|
55 | # Check if the blocks are adjacent, return none if not. |
---|
56 | if len( m1.components ) != len( m2.components ): return None |
---|
57 | for c1, c2 in izip( m1.components, m2.components ): |
---|
58 | if c1.src != c2.src: return None |
---|
59 | if c1.strand != c2.strand: return None |
---|
60 | if c1.end != c2.start: return None |
---|
61 | # Try to fuse: |
---|
62 | n = deepcopy( m1 ) |
---|
63 | for c1, c2 in izip( n.components, m2.components ): |
---|
64 | c1.text += c2.text |
---|
65 | c1.size += c2.size |
---|
66 | n.text_size = len( n.components[0].text ) |
---|
67 | return n |
---|
68 | |
---|
69 | class FusingAlignmentWriter( object ): |
---|
70 | """ |
---|
71 | Wrapper for an alignment Writer which attempts to fuse adjacent blocks |
---|
72 | """ |
---|
73 | def __init__( self, maf_writer ): |
---|
74 | self.maf_writer = maf_writer |
---|
75 | self.last = None |
---|
76 | def write( self, m ): |
---|
77 | if not self.last: |
---|
78 | self.last = m |
---|
79 | else: |
---|
80 | fused = fuse( self.last, m ) |
---|
81 | if fused: |
---|
82 | self.last = fused |
---|
83 | else: |
---|
84 | self.maf_writer.write( self.last ) |
---|
85 | self.last = m |
---|
86 | def close( self ): |
---|
87 | if self.last: self.maf_writer.write( self.last ) |
---|
88 | self.maf_writer.close() |
---|