1 | """ |
---|
2 | Tests for `bx.align.maf.sitemask`. |
---|
3 | """ |
---|
4 | |
---|
5 | import sys,tempfile |
---|
6 | import unittest |
---|
7 | from StringIO import StringIO |
---|
8 | import cpg |
---|
9 | import bx.align.maf |
---|
10 | |
---|
11 | test_maf_cpg = """##maf version=1 scoring=none |
---|
12 | a score=0 |
---|
13 | s apple 34 64 + 110 AGGGA---GTTCGTCACT------GTCGTAAGGGTTCAGA--CTGTCTATGTATACACAAGTTGTGTTGCA--ACCG |
---|
14 | s orange 19 61 - 100 AGGGATGCGTT--TCACTGCTATCGTCGTA----TTCAGACTTCG-CTATCT------GAGTTGT---GCATTACCG |
---|
15 | """ |
---|
16 | |
---|
17 | cpg_inclusive_result = [ |
---|
18 | "##maf,version=1", |
---|
19 | "a,score=0", |
---|
20 | "s,apple,34,64,+,110,AGGGA---GTTCGTCACT------GT##TAAGGGTTCAGA--CTGTCTATGTATACACAAGTTGTGTTGCA--ACCG", |
---|
21 | "s,orange,19,61,-,100,AGGGATG#GTT--TCACTGCTAT#GT##TA----TTCAGACTTCG-CTATCT------GAGTTGT---GCATTACCG" |
---|
22 | ] |
---|
23 | |
---|
24 | cpg_restricted_result = [ |
---|
25 | "##maf,version=1", |
---|
26 | "a,score=0", |
---|
27 | "s,apple,34,64,+,110,A##GA---#TT##TC#C#------#T##TA###GTTC#GA--C##TC#A#G#ATAC####GT#G#GT#GC#--AC#G", |
---|
28 | "s,orange,19,61,-,100,A##GA#G##TT--TC#C#GC#AT##T##TA----TTC#GAC#T##-C#A#C#------##GT#G#---GC#TTAC#G" |
---|
29 | ] |
---|
30 | |
---|
31 | noncpg_result = [ |
---|
32 | "##maf,version=1", |
---|
33 | "a,score=0", |
---|
34 | "s,apple,34,64,+,110,#GG##---G##CG##A#T------G#CG##AGG####A##--#TG##T#T#T####ACAA##T#T##T##A--##CG", |
---|
35 | "s,orange,19,61,-,100,#GG##T#CG##--##A#T##T##CG#CG##----###A###T#CG-#T#T#T------GA##T#T---##A####CG" |
---|
36 | ] |
---|
37 | |
---|
38 | def test_cpg_inclusive(): |
---|
39 | reader = bx.align.maf.Reader( StringIO( test_maf_cpg ) ) |
---|
40 | out = tempfile.NamedTemporaryFile('w') |
---|
41 | writer = bx.align.maf.Writer( out ) |
---|
42 | cpgfilter = cpg.Inclusive( mask='#' ) |
---|
43 | cpgfilter.run( reader, writer.write ) |
---|
44 | out.seek(0) |
---|
45 | j=0 |
---|
46 | for line in file(out.name): |
---|
47 | line = line.strip() |
---|
48 | if not(line): |
---|
49 | continue |
---|
50 | assert cpg_inclusive_result[j] == ",".join(line.split()) |
---|
51 | j+=1 |
---|
52 | |
---|
53 | def test_cpg_restricted(): |
---|
54 | reader = bx.align.maf.Reader( StringIO( test_maf_cpg ) ) |
---|
55 | out = tempfile.NamedTemporaryFile('w') |
---|
56 | writer = bx.align.maf.Writer( out ) |
---|
57 | cpgfilter = cpg.Restricted( mask='#' ) |
---|
58 | cpgfilter.run( reader, writer.write ) |
---|
59 | out.seek(0) |
---|
60 | j=0 |
---|
61 | for line in file(out.name): |
---|
62 | line = line.strip() |
---|
63 | if not(line): |
---|
64 | continue |
---|
65 | assert cpg_restricted_result[j] == ",".join(line.split()) |
---|
66 | j+=1 |
---|
67 | |
---|
68 | def test_non_cpg(): |
---|
69 | reader = bx.align.maf.Reader( StringIO( test_maf_cpg ) ) |
---|
70 | out = tempfile.NamedTemporaryFile('w') |
---|
71 | writer = bx.align.maf.Writer( out ) |
---|
72 | cpgfilter = cpg.nonCpG( mask='#' ) |
---|
73 | cpgfilter.run( reader, writer.write ) |
---|
74 | out.seek(0) |
---|
75 | j=0 |
---|
76 | for line in file(out.name): |
---|
77 | line = line.strip() |
---|
78 | if not(line): |
---|
79 | continue |
---|
80 | assert noncpg_result[j] == ",".join(line.split()) |
---|
81 | j+=1 |
---|
82 | |
---|