| 1 | """ |
|---|
| 2 | Tests for `bx.misc.seekbzip2`. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | import tempfile |
|---|
| 6 | import commands |
|---|
| 7 | import os |
|---|
| 8 | import random |
|---|
| 9 | from itertools import * |
|---|
| 10 | |
|---|
| 11 | import seekbzip2 |
|---|
| 12 | import bz2 |
|---|
| 13 | |
|---|
| 14 | F=None |
|---|
| 15 | T=None |
|---|
| 16 | |
|---|
| 17 | #F="/Users/james/work/seek-bzip2/test_random.dat.bz2" |
|---|
| 18 | #T="/Users/james/cache/hg18/align/multiz28way/chr10.maf.bz2" |
|---|
| 19 | |
|---|
| 20 | #F=/depot/data1/cache/human/hg18/align/multiz28way/chr1.maf.bz2 |
|---|
| 21 | |
|---|
| 22 | import sys |
|---|
| 23 | |
|---|
| 24 | if F and os.path.exists( F ): |
|---|
| 25 | |
|---|
| 26 | def test_linear_reading(): |
|---|
| 27 | raw_data = bz2.BZ2File( F ).read() |
|---|
| 28 | f = seekbzip2.SeekableBzip2File( F, F + "t" ) |
|---|
| 29 | chunk = 1221 |
|---|
| 30 | pos = 0 |
|---|
| 31 | for i in range( ( len(raw_data) // chunk ) + 1 ): |
|---|
| 32 | a = raw_data[pos:pos+chunk] |
|---|
| 33 | b = f.read( chunk ) |
|---|
| 34 | assert a == b |
|---|
| 35 | pos += chunk |
|---|
| 36 | assert f.tell() == min( pos, len(raw_data) ) |
|---|
| 37 | f.close() |
|---|
| 38 | |
|---|
| 39 | def test_random_seeking(): |
|---|
| 40 | raw_data = bz2.BZ2File( F ).read() |
|---|
| 41 | f = seekbzip2.SeekableBzip2File( F, F + "t" ) |
|---|
| 42 | for i in range( 10 ): |
|---|
| 43 | seek_to = random.randrange( len( raw_data ) - 100 ) |
|---|
| 44 | chunk = random.randrange( 10, 20 ) |
|---|
| 45 | |
|---|
| 46 | f.seek( seek_to ) |
|---|
| 47 | a = f.read( chunk ) |
|---|
| 48 | b = raw_data[ seek_to : seek_to + chunk ] |
|---|
| 49 | |
|---|
| 50 | assert a == b, "'%s' != '%s' on %dth attempt" % ( a.encode("hex"), b.encode("hex"), i ) |
|---|
| 51 | |
|---|
| 52 | assert f.tell() == min( seek_to + chunk, len(raw_data) ) |
|---|
| 53 | f.close() |
|---|
| 54 | |
|---|
| 55 | if T and os.path.exists( T ): |
|---|
| 56 | |
|---|
| 57 | def test_text_reading(): |
|---|
| 58 | #raw_data = bz2.BZ2File( T ).read() |
|---|
| 59 | #raw_lines = raw_data.split( "\n" ) |
|---|
| 60 | raw_file = bz2.BZ2File( T ) |
|---|
| 61 | f = seekbzip2.SeekableBzip2File( T, T + "t" ) |
|---|
| 62 | pos = 0 |
|---|
| 63 | for i, ( line, raw_line ) in enumerate( izip( f, raw_file ) ): |
|---|
| 64 | assert line == raw_line, "%d: %r != %r" % ( i, line.rstrip( "\n" ), raw_line ) |
|---|
| 65 | pos += len( line ) |
|---|
| 66 | ftell = f.tell() |
|---|
| 67 | assert ftell == pos, "%d != %d" % ( ftell, pos ) |
|---|
| 68 | f.close() |
|---|
| 69 | |
|---|
| 70 | |
|---|
| 71 | def test_text_reading_2(): |
|---|
| 72 | raw_data = bz2.BZ2File( T ).read() |
|---|
| 73 | f = seekbzip2.SeekableBzip2File( T, T + "t" ) |
|---|
| 74 | raw_lines = raw_data.split( "\n" ) |
|---|
| 75 | pos = 0 |
|---|
| 76 | i = 0 |
|---|
| 77 | while 1: |
|---|
| 78 | line = f.readline() |
|---|
| 79 | if line == "": break |
|---|
| 80 | assert line.rstrip( "\r\n" ) == raw_lines[i], "%r != %r" % ( line.rstrip( "\r\n" ), raw_lines[i] ) |
|---|
| 81 | pos += len( line ) |
|---|
| 82 | ftell = f.tell() |
|---|
| 83 | assert ftell == pos, "%d != %d" % ( ftell, pos ) |
|---|
| 84 | i += 1 |
|---|
| 85 | f.close() |
|---|
| 86 | |
|---|