1 | """ |
---|
2 | Tests for `bx.misc.seekbzip2`. |
---|
3 | """ |
---|
4 | |
---|
5 | import tempfile |
---|
6 | import commands |
---|
7 | import os |
---|
8 | import random |
---|
9 | from itertools import * |
---|
10 | |
---|
11 | import seekbzip2 |
---|
12 | import bz2 |
---|
13 | |
---|
14 | F=None |
---|
15 | T=None |
---|
16 | |
---|
17 | #F="/Users/james/work/seek-bzip2/test_random.dat.bz2" |
---|
18 | #T="/Users/james/cache/hg18/align/multiz28way/chr10.maf.bz2" |
---|
19 | |
---|
20 | #F=/depot/data1/cache/human/hg18/align/multiz28way/chr1.maf.bz2 |
---|
21 | |
---|
22 | import sys |
---|
23 | |
---|
24 | if F and os.path.exists( F ): |
---|
25 | |
---|
26 | def test_linear_reading(): |
---|
27 | raw_data = bz2.BZ2File( F ).read() |
---|
28 | f = seekbzip2.SeekableBzip2File( F, F + "t" ) |
---|
29 | chunk = 1221 |
---|
30 | pos = 0 |
---|
31 | for i in range( ( len(raw_data) // chunk ) + 1 ): |
---|
32 | a = raw_data[pos:pos+chunk] |
---|
33 | b = f.read( chunk ) |
---|
34 | assert a == b |
---|
35 | pos += chunk |
---|
36 | assert f.tell() == min( pos, len(raw_data) ) |
---|
37 | f.close() |
---|
38 | |
---|
39 | def test_random_seeking(): |
---|
40 | raw_data = bz2.BZ2File( F ).read() |
---|
41 | f = seekbzip2.SeekableBzip2File( F, F + "t" ) |
---|
42 | for i in range( 10 ): |
---|
43 | seek_to = random.randrange( len( raw_data ) - 100 ) |
---|
44 | chunk = random.randrange( 10, 20 ) |
---|
45 | |
---|
46 | f.seek( seek_to ) |
---|
47 | a = f.read( chunk ) |
---|
48 | b = raw_data[ seek_to : seek_to + chunk ] |
---|
49 | |
---|
50 | assert a == b, "'%s' != '%s' on %dth attempt" % ( a.encode("hex"), b.encode("hex"), i ) |
---|
51 | |
---|
52 | assert f.tell() == min( seek_to + chunk, len(raw_data) ) |
---|
53 | f.close() |
---|
54 | |
---|
55 | if T and os.path.exists( T ): |
---|
56 | |
---|
57 | def test_text_reading(): |
---|
58 | #raw_data = bz2.BZ2File( T ).read() |
---|
59 | #raw_lines = raw_data.split( "\n" ) |
---|
60 | raw_file = bz2.BZ2File( T ) |
---|
61 | f = seekbzip2.SeekableBzip2File( T, T + "t" ) |
---|
62 | pos = 0 |
---|
63 | for i, ( line, raw_line ) in enumerate( izip( f, raw_file ) ): |
---|
64 | assert line == raw_line, "%d: %r != %r" % ( i, line.rstrip( "\n" ), raw_line ) |
---|
65 | pos += len( line ) |
---|
66 | ftell = f.tell() |
---|
67 | assert ftell == pos, "%d != %d" % ( ftell, pos ) |
---|
68 | f.close() |
---|
69 | |
---|
70 | |
---|
71 | def test_text_reading_2(): |
---|
72 | raw_data = bz2.BZ2File( T ).read() |
---|
73 | f = seekbzip2.SeekableBzip2File( T, T + "t" ) |
---|
74 | raw_lines = raw_data.split( "\n" ) |
---|
75 | pos = 0 |
---|
76 | i = 0 |
---|
77 | while 1: |
---|
78 | line = f.readline() |
---|
79 | if line == "": break |
---|
80 | assert line.rstrip( "\r\n" ) == raw_lines[i], "%r != %r" % ( line.rstrip( "\r\n" ), raw_lines[i] ) |
---|
81 | pos += len( line ) |
---|
82 | ftell = f.tell() |
---|
83 | assert ftell == pos, "%d != %d" % ( ftell, pos ) |
---|
84 | i += 1 |
---|
85 | f.close() |
---|
86 | |
---|