| 1 | """ |
|---|
| 2 | Tests for `bx.align.maf.sitemask`. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | import sys,tempfile |
|---|
| 6 | import unittest |
|---|
| 7 | from StringIO import StringIO |
|---|
| 8 | import cpg |
|---|
| 9 | import bx.align.maf |
|---|
| 10 | |
|---|
| 11 | test_maf_cpg = """##maf version=1 scoring=none |
|---|
| 12 | a score=0 |
|---|
| 13 | s apple 34 64 + 110 AGGGA---GTTCGTCACT------GTCGTAAGGGTTCAGA--CTGTCTATGTATACACAAGTTGTGTTGCA--ACCG |
|---|
| 14 | s orange 19 61 - 100 AGGGATGCGTT--TCACTGCTATCGTCGTA----TTCAGACTTCG-CTATCT------GAGTTGT---GCATTACCG |
|---|
| 15 | """ |
|---|
| 16 | |
|---|
| 17 | cpg_inclusive_result = [ |
|---|
| 18 | "##maf,version=1", |
|---|
| 19 | "a,score=0", |
|---|
| 20 | "s,apple,34,64,+,110,AGGGA---GTTCGTCACT------GT##TAAGGGTTCAGA--CTGTCTATGTATACACAAGTTGTGTTGCA--ACCG", |
|---|
| 21 | "s,orange,19,61,-,100,AGGGATG#GTT--TCACTGCTAT#GT##TA----TTCAGACTTCG-CTATCT------GAGTTGT---GCATTACCG" |
|---|
| 22 | ] |
|---|
| 23 | |
|---|
| 24 | cpg_restricted_result = [ |
|---|
| 25 | "##maf,version=1", |
|---|
| 26 | "a,score=0", |
|---|
| 27 | "s,apple,34,64,+,110,A##GA---#TT##TC#C#------#T##TA###GTTC#GA--C##TC#A#G#ATAC####GT#G#GT#GC#--AC#G", |
|---|
| 28 | "s,orange,19,61,-,100,A##GA#G##TT--TC#C#GC#AT##T##TA----TTC#GAC#T##-C#A#C#------##GT#G#---GC#TTAC#G" |
|---|
| 29 | ] |
|---|
| 30 | |
|---|
| 31 | noncpg_result = [ |
|---|
| 32 | "##maf,version=1", |
|---|
| 33 | "a,score=0", |
|---|
| 34 | "s,apple,34,64,+,110,#GG##---G##CG##A#T------G#CG##AGG####A##--#TG##T#T#T####ACAA##T#T##T##A--##CG", |
|---|
| 35 | "s,orange,19,61,-,100,#GG##T#CG##--##A#T##T##CG#CG##----###A###T#CG-#T#T#T------GA##T#T---##A####CG" |
|---|
| 36 | ] |
|---|
| 37 | |
|---|
| 38 | def test_cpg_inclusive(): |
|---|
| 39 | reader = bx.align.maf.Reader( StringIO( test_maf_cpg ) ) |
|---|
| 40 | out = tempfile.NamedTemporaryFile('w') |
|---|
| 41 | writer = bx.align.maf.Writer( out ) |
|---|
| 42 | cpgfilter = cpg.Inclusive( mask='#' ) |
|---|
| 43 | cpgfilter.run( reader, writer.write ) |
|---|
| 44 | out.seek(0) |
|---|
| 45 | j=0 |
|---|
| 46 | for line in file(out.name): |
|---|
| 47 | line = line.strip() |
|---|
| 48 | if not(line): |
|---|
| 49 | continue |
|---|
| 50 | assert cpg_inclusive_result[j] == ",".join(line.split()) |
|---|
| 51 | j+=1 |
|---|
| 52 | |
|---|
| 53 | def test_cpg_restricted(): |
|---|
| 54 | reader = bx.align.maf.Reader( StringIO( test_maf_cpg ) ) |
|---|
| 55 | out = tempfile.NamedTemporaryFile('w') |
|---|
| 56 | writer = bx.align.maf.Writer( out ) |
|---|
| 57 | cpgfilter = cpg.Restricted( mask='#' ) |
|---|
| 58 | cpgfilter.run( reader, writer.write ) |
|---|
| 59 | out.seek(0) |
|---|
| 60 | j=0 |
|---|
| 61 | for line in file(out.name): |
|---|
| 62 | line = line.strip() |
|---|
| 63 | if not(line): |
|---|
| 64 | continue |
|---|
| 65 | assert cpg_restricted_result[j] == ",".join(line.split()) |
|---|
| 66 | j+=1 |
|---|
| 67 | |
|---|
| 68 | def test_non_cpg(): |
|---|
| 69 | reader = bx.align.maf.Reader( StringIO( test_maf_cpg ) ) |
|---|
| 70 | out = tempfile.NamedTemporaryFile('w') |
|---|
| 71 | writer = bx.align.maf.Writer( out ) |
|---|
| 72 | cpgfilter = cpg.nonCpG( mask='#' ) |
|---|
| 73 | cpgfilter.run( reader, writer.write ) |
|---|
| 74 | out.seek(0) |
|---|
| 75 | j=0 |
|---|
| 76 | for line in file(out.name): |
|---|
| 77 | line = line.strip() |
|---|
| 78 | if not(line): |
|---|
| 79 | continue |
|---|
| 80 | assert noncpg_result[j] == ",".join(line.split()) |
|---|
| 81 | j+=1 |
|---|
| 82 | |
|---|