1 | """ |
---|
2 | Support for scoring alignments using arbitrary scoring matrices, arbitrary |
---|
3 | alphabets, and affine gap penalties. |
---|
4 | """ |
---|
5 | |
---|
6 | from numpy import * |
---|
7 | |
---|
8 | class ScoringScheme( object ): |
---|
9 | # note that gap_open and gap_extend are penalties, which means you should make them positive |
---|
10 | def __init__( self, gap_open, gap_extend, default=-100, alphabet1="ACGT", alphabet2=None, gap1="-", gap2=None, text1_range=128, text2_range=None, typecode=int32 ): |
---|
11 | if (text2_range == None): text2_range = text1_range |
---|
12 | if (alphabet2 == None): alphabet2 = alphabet1 |
---|
13 | if (gap2 == None): gap2 = gap1 # (scheme with gap1=gap2=None is legit) |
---|
14 | if type(alphabet1) == str: alphabet1 = [ch for ch in alphabet1] |
---|
15 | if type(alphabet2) == str: alphabet2 = [ch for ch in alphabet2] |
---|
16 | self.table = ones( (text1_range, text2_range), typecode ) |
---|
17 | self.table *= default |
---|
18 | self.gap_open = gap_open |
---|
19 | self.gap_extend = gap_extend |
---|
20 | self.gap1 = gap1 |
---|
21 | self.gap2 = gap2 |
---|
22 | self.alphabet1 = alphabet1 |
---|
23 | self.alphabet2 = alphabet2 |
---|
24 | # private _set_score and _get_score allow subclasses to override them to |
---|
25 | # implement a different underlying table object |
---|
26 | def _set_score(self,(a,b),val): |
---|
27 | self.table[a,b] = val |
---|
28 | def _get_score(self,(a,b)): |
---|
29 | return self.table[a,b] |
---|
30 | def set_score( self, a, b, val, foldcase1=False, foldcase2=False ): |
---|
31 | self._set_score((a,b),val) |
---|
32 | if foldcase1: |
---|
33 | aCh = chr(a) |
---|
34 | if (aCh.isupper()): aa = ord(aCh.lower()) |
---|
35 | elif (aCh.islower()): aa = ord(aCh.upper()) |
---|
36 | else: foldcase1 = False |
---|
37 | if foldcase2: |
---|
38 | bCh = chr(b) |
---|
39 | if (bCh.isupper()): bb = ord(bCh.lower()) |
---|
40 | elif (bCh.islower()): bb = ord(bCh.upper()) |
---|
41 | else: foldcase2 = False |
---|
42 | if foldcase1 and foldcase2: |
---|
43 | self._set_score((aa,b ),val) |
---|
44 | self._set_score((a ,bb),val) |
---|
45 | self._set_score((aa,bb),val) |
---|
46 | elif foldcase1: |
---|
47 | self._set_score((aa,b ),val) |
---|
48 | elif foldcase2: |
---|
49 | self._set_score((a ,bb),val) |
---|
50 | def score_alignment( self, a ): |
---|
51 | return score_alignment(self,a) |
---|
52 | def score_texts( self, text1, text2 ): |
---|
53 | return score_texts( self, text1, text2 ) |
---|
54 | def __str__ (self): |
---|
55 | isDna1 = "".join( self.alphabet1 ) == "ACGT" |
---|
56 | isDna2 = "".join( self.alphabet2 ) == "ACGT" |
---|
57 | labelRows = not ( isDna1 and isDna2 ) |
---|
58 | width = 3 |
---|
59 | for a in self.alphabet1: |
---|
60 | for b in self.alphabet2: |
---|
61 | score = self._get_score((ord(a),ord(b))) |
---|
62 | if (type(score) == float): s = "%8.6f" % score |
---|
63 | else: s = "%s" % score |
---|
64 | if (len(s)+1 > width): |
---|
65 | width = len(s)+1 |
---|
66 | lines = [] |
---|
67 | line = [] |
---|
68 | if labelRows: |
---|
69 | if isDna1: line.append(" ") |
---|
70 | else: line.append(" ") |
---|
71 | for b in self.alphabet2: |
---|
72 | if isDna2: s = b |
---|
73 | else: s = "%02X" % ord(b) |
---|
74 | line.append("%*s" % (width,s)) |
---|
75 | lines.append(("".join(line))+"\n") |
---|
76 | for a in self.alphabet1: |
---|
77 | line = [] |
---|
78 | if labelRows: |
---|
79 | if isDna1: line.append(a) |
---|
80 | else: line.append("%02X" % ord(a)) |
---|
81 | for b in self.alphabet2: |
---|
82 | score = self._get_score((ord(a),ord(b))) |
---|
83 | if (type(score) == float): s = "%8.6f" % score |
---|
84 | else: s = "%s" % score |
---|
85 | line.append("%*s" % (width,s)) |
---|
86 | lines.append(("".join(line))+"\n") |
---|
87 | return "".join(lines) |
---|
88 | |
---|
89 | def read_scoring_scheme( f, gap_open, gap_extend, gap1="-", gap2=None, **kwargs ): |
---|
90 | """ |
---|
91 | Initialize scoring scheme from a file containint a blastz style text blob. |
---|
92 | f can be either a file or the name of a file. |
---|
93 | """ |
---|
94 | close_it = False |
---|
95 | if (type(f) == str): |
---|
96 | f = file(f,"rt") |
---|
97 | close_it = True |
---|
98 | ss = build_scoring_scheme("".join([line for line in f]),gap_open, gap_extend, gap1=gap1, gap2=gap2, **kwargs) |
---|
99 | if (close_it): |
---|
100 | f.close() |
---|
101 | return ss |
---|
102 | |
---|
103 | def build_scoring_scheme( s, gap_open, gap_extend, gap1="-", gap2=None, **kwargs ): |
---|
104 | """ |
---|
105 | Initialize scoring scheme from a blastz style text blob, first line |
---|
106 | specifies the bases for each row/col, subsequent lines contain the |
---|
107 | corresponding scores. Slaw extensions allow for unusual and/or |
---|
108 | asymmetric alphabets. Symbols can be two digit hex, and each row |
---|
109 | begins with symbol. Note that a row corresponds to a symbol in text1 |
---|
110 | and a column to a symbol in text2. |
---|
111 | |
---|
112 | examples: |
---|
113 | |
---|
114 | blastz slaw |
---|
115 | |
---|
116 | A C G T 01 02 A C G T |
---|
117 | 91 -114 -31 -123 01 200 -200 -50 100 -50 100 |
---|
118 | -114 100 -125 -31 02 -200 200 100 -50 100 -50 |
---|
119 | -31 -125 100 -114 |
---|
120 | -123 -31 -114 91 |
---|
121 | """ |
---|
122 | # perform initial parse to determine alphabets and locate scores |
---|
123 | bad_matrix = "invalid scoring matrix" |
---|
124 | s = s.rstrip( "\n" ) |
---|
125 | lines = s.split( "\n" ) |
---|
126 | rows = [] |
---|
127 | symbols2 = lines.pop(0).split() |
---|
128 | symbols1 = None |
---|
129 | rows_have_syms = False |
---|
130 | a_la_blastz = True |
---|
131 | for i, line in enumerate( lines ): |
---|
132 | row_scores = line.split() |
---|
133 | if len( row_scores ) == len( symbols2 ): # blastz-style row |
---|
134 | if symbols1 == None: |
---|
135 | if len( lines ) != len( symbols2 ): |
---|
136 | raise bad_matrix |
---|
137 | symbols1 = symbols2 |
---|
138 | elif (rows_have_syms): |
---|
139 | raise bad_matrix |
---|
140 | elif len( row_scores ) == len( symbols2 ) + 1: # row starts with symbol |
---|
141 | if symbols1 == None: |
---|
142 | symbols1 = [] |
---|
143 | rows_have_syms = True |
---|
144 | a_la_blastz = False |
---|
145 | elif not rows_have_syms: |
---|
146 | raise bad_matrix |
---|
147 | symbols1.append( row_scores.pop(0) ) |
---|
148 | else: |
---|
149 | raise bad_matrix |
---|
150 | rows.append( row_scores ) |
---|
151 | # convert alphabets from strings to characters |
---|
152 | try: |
---|
153 | alphabet1 = [sym_to_char( sym ) for sym in symbols1] |
---|
154 | alphabet2 = [sym_to_char( sym ) for sym in symbols2] |
---|
155 | except ValueError: |
---|
156 | raise bad_matrix |
---|
157 | if (alphabet1 != symbols1) or (alphabet2 != symbols2): |
---|
158 | a_la_blastz = False |
---|
159 | if a_la_blastz: |
---|
160 | alphabet1 = [ch.upper() for ch in alphabet1] |
---|
161 | alphabet2 = [ch.upper() for ch in alphabet2] |
---|
162 | # decide if rows and/or columns should reflect case |
---|
163 | if a_la_blastz: |
---|
164 | foldcase1 = foldcase2 = True |
---|
165 | else: |
---|
166 | foldcase1 = "".join( alphabet1 ) == "ACGT" |
---|
167 | foldcase2 = "".join( alphabet2 ) == "ACGT" |
---|
168 | # create appropriately sized matrix |
---|
169 | text1_range = text2_range = 128 |
---|
170 | if ord( max( alphabet1 ) ) >= 128: text1_range = 256 |
---|
171 | if ord( max( alphabet2 ) ) >= 128: text2_range = 256 |
---|
172 | typecode = int32 |
---|
173 | for i, row_scores in enumerate( rows ): |
---|
174 | for j, score in enumerate( map( int_or_float, row_scores ) ): |
---|
175 | if type( score ) == float: |
---|
176 | typecode = float32 |
---|
177 | if type( gap_open ) == float: |
---|
178 | typecode = float32 |
---|
179 | if type( gap_extend ) == float: |
---|
180 | typecode = float32 |
---|
181 | ss = ScoringScheme( gap_open, gap_extend, alphabet1=alphabet1, alphabet2=alphabet2, gap1=gap1, gap2=gap2, text1_range=text1_range, text2_range=text2_range, typecode=typecode, **kwargs ) |
---|
182 | # fill matrix |
---|
183 | for i, row_scores in enumerate( rows ): |
---|
184 | for j, score in enumerate( map( int_or_float, row_scores ) ): |
---|
185 | ss.set_score( ord( alphabet1[i] ), ord( alphabet2[j] ), score ) |
---|
186 | if foldcase1 and foldcase2: |
---|
187 | ss.set_score( ord( alphabet1[i].lower() ), ord( alphabet2[j].upper() ), score ) |
---|
188 | ss.set_score( ord( alphabet1[i].upper() ), ord( alphabet2[j].lower() ), score ) |
---|
189 | ss.set_score( ord( alphabet1[i].lower() ), ord( alphabet2[j].lower() ), score ) |
---|
190 | elif foldcase1: |
---|
191 | ss.set_score( ord( alphabet1[i].lower() ), ord( alphabet2[j] ), score ) |
---|
192 | elif foldcase2: |
---|
193 | ss.set_score( ord( alphabet1[i] ), ord( alphabet2[j].lower() ), score ) |
---|
194 | return ss |
---|
195 | |
---|
196 | def int_or_float( s ): |
---|
197 | try: return int( s ) |
---|
198 | except: return float( s ) |
---|
199 | |
---|
200 | # convert possible two-char symbol to a single character |
---|
201 | def sym_to_char( sym ): |
---|
202 | if len( sym ) == 1: return sym |
---|
203 | elif len( sym ) != 2: raise ValueError |
---|
204 | else: return chr(int(sym,base=16)) |
---|
205 | |
---|
206 | def score_alignment( scoring_scheme, a ): |
---|
207 | score = 0 |
---|
208 | ncomps = len( a.components ) |
---|
209 | for i in range( ncomps ): |
---|
210 | for j in range( i+1, ncomps ): |
---|
211 | score += score_texts( scoring_scheme, a.components[i].text, a.components[j].text ) |
---|
212 | return score |
---|
213 | |
---|
214 | def score_texts( scoring_scheme, text1, text2 ): |
---|
215 | rval = 0 |
---|
216 | last_gap_a = last_gap_b = False |
---|
217 | for i in range( len( text1 ) ): |
---|
218 | a = text1[i] |
---|
219 | b = text2[i] |
---|
220 | # Ignore gap/gap pair |
---|
221 | if a == scoring_scheme.gap1 and b == scoring_scheme.gap2: |
---|
222 | continue |
---|
223 | # Gap in first species |
---|
224 | elif a == scoring_scheme.gap1: |
---|
225 | rval -= scoring_scheme.gap_extend |
---|
226 | if not last_gap_a: |
---|
227 | rval -= scoring_scheme.gap_open |
---|
228 | last_gap_a = True |
---|
229 | last_gap_b = False |
---|
230 | # Gap in second species |
---|
231 | elif b == scoring_scheme.gap2: |
---|
232 | rval -= scoring_scheme.gap_extend |
---|
233 | if not last_gap_b: |
---|
234 | rval -= scoring_scheme.gap_open |
---|
235 | last_gap_a = False |
---|
236 | last_gap_b = True |
---|
237 | # Aligned base |
---|
238 | else: |
---|
239 | rval += scoring_scheme._get_score((ord(a),ord(b))) |
---|
240 | last_gap_a = last_gap_b = False |
---|
241 | return rval |
---|
242 | |
---|
243 | def accumulate_scores( scoring_scheme, text1, text2, skip_ref_gaps=False ): |
---|
244 | """ |
---|
245 | Return cumulative scores for each position in alignment as a 1d array. |
---|
246 | |
---|
247 | If `skip_ref_gaps` is False positions in returned array correspond to each |
---|
248 | column in alignment, if True they correspond to each non-gap position (each |
---|
249 | base) in text1. |
---|
250 | """ |
---|
251 | if skip_ref_gaps: |
---|
252 | rval = zeros( len( text1 ) - text1.count( scoring_scheme.gap1 ) ) |
---|
253 | else: |
---|
254 | rval = zeros( len( text1 ) ) |
---|
255 | score = 0 |
---|
256 | pos = 0 |
---|
257 | last_gap_a = last_gap_b = False |
---|
258 | for i in range( len( text1 ) ): |
---|
259 | a = text1[i] |
---|
260 | b = text2[i] |
---|
261 | # Ignore gap/gap pair |
---|
262 | if a == scoring_scheme.gap1 and b == scoring_scheme.gap2: |
---|
263 | continue |
---|
264 | # Gap in first species |
---|
265 | elif a == scoring_scheme.gap1: |
---|
266 | score -= scoring_scheme.gap_extend |
---|
267 | if not last_gap_a: |
---|
268 | score -= scoring_scheme.gap_open |
---|
269 | last_gap_a = True |
---|
270 | last_gap_b = False |
---|
271 | # Gap in second species |
---|
272 | elif b == scoring_scheme.gap2: |
---|
273 | score -= scoring_scheme.gap_extend |
---|
274 | if not last_gap_b: |
---|
275 | score -= scoring_scheme.gap_open |
---|
276 | last_gap_a = False |
---|
277 | last_gap_b = True |
---|
278 | # Aligned base |
---|
279 | else: |
---|
280 | score += scoring_scheme._get_score((ord(a),ord(b))) |
---|
281 | last_gap_a = last_gap_b = False |
---|
282 | if not( skip_ref_gaps ) or a != scoring_scheme.gap1: |
---|
283 | rval[pos] = score |
---|
284 | pos += 1 |
---|
285 | return rval |
---|
286 | |
---|
287 | hox70 = build_scoring_scheme( """ A C G T |
---|
288 | 91 -114 -31 -123 |
---|
289 | -114 100 -125 -31 |
---|
290 | -31 -125 100 -114 |
---|
291 | -123 -31 -114 91 """, 400, 30 ) |
---|