[3] | 1 | """ |
---|
| 2 | Tests for `bx.misc.seekbzip2`. |
---|
| 3 | """ |
---|
| 4 | |
---|
| 5 | import tempfile |
---|
| 6 | import commands |
---|
| 7 | import os |
---|
| 8 | import random |
---|
| 9 | from itertools import * |
---|
| 10 | |
---|
| 11 | import seekbzip2 |
---|
| 12 | import bz2 |
---|
| 13 | |
---|
| 14 | F=None |
---|
| 15 | T=None |
---|
| 16 | |
---|
| 17 | #F="/Users/james/work/seek-bzip2/test_random.dat.bz2" |
---|
| 18 | #T="/Users/james/cache/hg18/align/multiz28way/chr10.maf.bz2" |
---|
| 19 | |
---|
| 20 | #F=/depot/data1/cache/human/hg18/align/multiz28way/chr1.maf.bz2 |
---|
| 21 | |
---|
| 22 | import sys |
---|
| 23 | |
---|
| 24 | if F and os.path.exists( F ): |
---|
| 25 | |
---|
| 26 | def test_linear_reading(): |
---|
| 27 | raw_data = bz2.BZ2File( F ).read() |
---|
| 28 | f = seekbzip2.SeekableBzip2File( F, F + "t" ) |
---|
| 29 | chunk = 1221 |
---|
| 30 | pos = 0 |
---|
| 31 | for i in range( ( len(raw_data) // chunk ) + 1 ): |
---|
| 32 | a = raw_data[pos:pos+chunk] |
---|
| 33 | b = f.read( chunk ) |
---|
| 34 | assert a == b |
---|
| 35 | pos += chunk |
---|
| 36 | assert f.tell() == min( pos, len(raw_data) ) |
---|
| 37 | f.close() |
---|
| 38 | |
---|
| 39 | def test_random_seeking(): |
---|
| 40 | raw_data = bz2.BZ2File( F ).read() |
---|
| 41 | f = seekbzip2.SeekableBzip2File( F, F + "t" ) |
---|
| 42 | for i in range( 10 ): |
---|
| 43 | seek_to = random.randrange( len( raw_data ) - 100 ) |
---|
| 44 | chunk = random.randrange( 10, 20 ) |
---|
| 45 | |
---|
| 46 | f.seek( seek_to ) |
---|
| 47 | a = f.read( chunk ) |
---|
| 48 | b = raw_data[ seek_to : seek_to + chunk ] |
---|
| 49 | |
---|
| 50 | assert a == b, "'%s' != '%s' on %dth attempt" % ( a.encode("hex"), b.encode("hex"), i ) |
---|
| 51 | |
---|
| 52 | assert f.tell() == min( seek_to + chunk, len(raw_data) ) |
---|
| 53 | f.close() |
---|
| 54 | |
---|
| 55 | if T and os.path.exists( T ): |
---|
| 56 | |
---|
| 57 | def test_text_reading(): |
---|
| 58 | #raw_data = bz2.BZ2File( T ).read() |
---|
| 59 | #raw_lines = raw_data.split( "\n" ) |
---|
| 60 | raw_file = bz2.BZ2File( T ) |
---|
| 61 | f = seekbzip2.SeekableBzip2File( T, T + "t" ) |
---|
| 62 | pos = 0 |
---|
| 63 | for i, ( line, raw_line ) in enumerate( izip( f, raw_file ) ): |
---|
| 64 | assert line == raw_line, "%d: %r != %r" % ( i, line.rstrip( "\n" ), raw_line ) |
---|
| 65 | pos += len( line ) |
---|
| 66 | ftell = f.tell() |
---|
| 67 | assert ftell == pos, "%d != %d" % ( ftell, pos ) |
---|
| 68 | f.close() |
---|
| 69 | |
---|
| 70 | |
---|
| 71 | def test_text_reading_2(): |
---|
| 72 | raw_data = bz2.BZ2File( T ).read() |
---|
| 73 | f = seekbzip2.SeekableBzip2File( T, T + "t" ) |
---|
| 74 | raw_lines = raw_data.split( "\n" ) |
---|
| 75 | pos = 0 |
---|
| 76 | i = 0 |
---|
| 77 | while 1: |
---|
| 78 | line = f.readline() |
---|
| 79 | if line == "": break |
---|
| 80 | assert line.rstrip( "\r\n" ) == raw_lines[i], "%r != %r" % ( line.rstrip( "\r\n" ), raw_lines[i] ) |
---|
| 81 | pos += len( line ) |
---|
| 82 | ftell = f.tell() |
---|
| 83 | assert ftell == pos, "%d != %d" % ( ftell, pos ) |
---|
| 84 | i += 1 |
---|
| 85 | f.close() |
---|
| 86 | |
---|