|
| 1 | +import unittest |
| 2 | + |
| 3 | +from pathlib import Path |
| 4 | +from tempfile import NamedTemporaryFile |
| 5 | +from util import TEST_DATA_PATH |
| 6 | + |
| 7 | +import numpy as np |
| 8 | + |
| 9 | +from newmap.main import (DEFAULT_SUFFIX_ARRAY_COMPRESSION_RATIO, |
| 10 | + DEFAULT_KMER_LENGTH_IN_SEED_TABLE) |
| 11 | +from newmap.generate_index import generate_fm_index |
| 12 | +from newmap.unique_counts import write_unique_counts |
| 13 | + |
| 14 | +# Expected minimum unique lengths at each position |
| 15 | +# NB: In order to manually count correctly, it is important to remember to |
| 16 | +# count the reverse complement of that k-mer as well. |
| 17 | +EXPECTED_CHR1_COUNTS = [0, 10, 9, 8, 7, 6, 5, 4, 4, 4, 6, 5, 4, 4, |
| 18 | + 4, 0, 0, 0, 0, 0] |
| 19 | +EXPECTED_CHR2_COUNTS = [10, 10, 9, 8, 7, 6, 5, 4, 4, 4, 0, 0, 0, 0, |
| 20 | + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, |
| 21 | + 0, 0] |
| 22 | + |
| 23 | + |
| 24 | +class TestCountKmers(unittest.TestCase): |
| 25 | + def setUp(self): |
| 26 | + self.genome_index_filename = NamedTemporaryFile(mode="w").name |
| 27 | + self.fasta_filename = str(TEST_DATA_PATH / 'genome.fa') |
| 28 | + generate_fm_index(self.fasta_filename, |
| 29 | + self.genome_index_filename, |
| 30 | + DEFAULT_SUFFIX_ARRAY_COMPRESSION_RATIO, |
| 31 | + DEFAULT_KMER_LENGTH_IN_SEED_TABLE) |
| 32 | + self.num_threads = 1 |
| 33 | + |
| 34 | + def test_binary_search(self): |
| 35 | + write_unique_counts(Path(self.fasta_filename), |
| 36 | + Path(self.genome_index_filename), |
| 37 | + 15, # Batch size |
| 38 | + list(range(4, 11)), # Kmer lengths 4 to 10 |
| 39 | + 0, # Initial search length |
| 40 | + [], # Include chr ids |
| 41 | + [], # Exclude chr ids |
| 42 | + self.num_threads, |
| 43 | + use_binary_search=True) |
| 44 | + |
| 45 | + # Check the results in chr1.unique.uint8 and chr2.unique.uint8 |
| 46 | + chr1_results = np.fromfile('chr1.unique.uint8', dtype=np.uint8) |
| 47 | + chr2_results = np.fromfile('chr2.unique.uint8', dtype=np.uint8) |
| 48 | + |
| 49 | + self.assertTrue(chr1_results.size == 20) |
| 50 | + self.assertTrue(chr2_results.size == 30) |
| 51 | + |
| 52 | + self.assertTrue(np.array_equal(chr1_results, EXPECTED_CHR1_COUNTS)) |
| 53 | + self.assertTrue(np.array_equal(chr2_results, EXPECTED_CHR2_COUNTS)) |
0 commit comments