Source code for dynast.preprocessing.coverage

import multiprocessing
import re
from collections import Counter
from functools import partial
from typing import Dict, List, Optional, Set, Tuple

import pysam

from .. import utils
from ..logging import logger

[docs]COVERAGE_PARSER = re.compile( r'''^ (?P<contig>[^,]*), (?P<genome_i>[^,]*), (?P<coverage>[^,]*)\n $''', re.VERBOSE
)
[docs]def read_coverage(coverage_path: str) -> Dict[str, Dict[int, int]]: """Read coverage CSV as a dictionary. Args: coverage_path: Path to coverage CSV Returns: Coverage as a nested dictionary """ coverage = {} with open(coverage_path, 'r') as f: for i, line in enumerate(f): groups = COVERAGE_PARSER.match(line).groupdict() try: contig, genome_i, count = groups['contig'], int(groups['genome_i']), int(groups['coverage']) coverage.setdefault(contig, {})[genome_i] = count except ValueError as e: if i == 0: continue raise e return coverage
[docs]def calculate_coverage_contig( counter: multiprocessing.Value, lock: multiprocessing.Lock, bam_path: str, contig: str, indices: List[Tuple[int, int, int]], alignments: Set[Tuple[str, int]] = None, umi_tag: Optional[str] = None, barcode_tag: Optional[str] = None, gene_tag: str = 'GX', barcodes: Optional[List[str]] = None, temp_dir: Optional[str] = None, update_every: int = 50000, velocity: bool = True ) -> str: """Calculate converage for a specific contig. This function is designed to be called as a separate process. Args: counter: Counter that keeps track of how many reads have been processed lock: Semaphore for the `counter` so that multiple processes do not modify it at the same time bam_path: Path to alignment BAM file contig: Only reads that map to this contig will be processed indices: Genomic positions to consider alignments: Set of (read_id, alignment_index) tuples to process. All alignments are processed if this option is not provided. umi_tag: BAM tag that encodes UMI, if not provided, `NA` is output in the `umi` column barcode_tag: BAM tag that encodes cell barcode, if not provided, `NA` is output in the `barcode` column gene_tag: BAM tag that encodes gene assignment, defaults to `GX` barcodes: List of barcodes to be considered. All barcodes are considered if not provided temp_dir: Path to temporary directory update_every: Update the counter every this many reads velocity: Whether or not velocities were assigned Returns: Path to coverage CSV """ def skip_alignment(read, tags): return read.is_secondary or read.is_unmapped or read.is_duplicate or any(not read.has_tag(tag) for tag in tags) coverage_path = utils.mkstemp(dir=temp_dir) paired = {} coverage = Counter() required_tags = [] if not velocity: required_tags.append(gene_tag) if umi_tag: required_tags.append(umi_tag) if barcode_tag: required_tags.append(barcode_tag) n = 0 with pysam.AlignmentFile(bam_path, 'rb', threads=2) as bam, \ open(coverage_path, 'w') as coverage_out: for read in bam.fetch(contig): n += 1 if n == update_every: with lock: counter.value += update_every n = 0 if len(coverage) > 1000000: # NOTE: dictionary keys are sorted by insertion order for genome_i in list(coverage.keys()): if genome_i < read.reference_start: coverage_out.write(f'{contig},{genome_i},{coverage[genome_i]}\n') del coverage[genome_i] else: break if skip_alignment(read, required_tags): continue barcode = read.get_tag(barcode_tag) if barcode_tag else 'NA' if barcode == '-' or (barcodes and barcode not in barcodes): continue read_id = read.query_name alignment_index = read.get_tag('HI') key = (read_id, alignment_index) if alignments and key not in alignments: continue reference_positions = read.get_reference_positions() # For paired end reads, don't count overlap twice if read.is_paired: if key not in paired: paired[key] = reference_positions continue reference_positions = list(set(reference_positions + paired[key])) del paired[key] for genome_i in reference_positions: if genome_i in indices: coverage[genome_i] += 1 for genome_i, cover in coverage.items(): coverage_out.write(f'{contig},{genome_i},{cover}\n') with lock: counter.value += n del coverage del indices if alignments: del alignments if barcodes: del barcodes return coverage_path
[docs]def calculate_coverage( bam_path: str, conversions: Dict[str, Set[int]], coverage_path: str, alignments: Optional[List[Tuple[str, int]]] = None, umi_tag: Optional[str] = None, barcode_tag: Optional[str] = None, gene_tag: str = 'GX', barcodes: Optional[List[str]] = None, temp_dir: Optional[str] = None, velocity: bool = True, ) -> str: """Calculate coverage of each genomic position per barcode. Args: bam_path: Path to alignment BAM file conversions: Dictionary of contigs as keys and sets of genomic positions as values that indicates positions where conversions were observed coverage_path: Path to write coverage CSV alignments: Set of (read_id, alignment_index) tuples to process. All alignments are processed if this option is not provided. umi_tag: BAM tag that encodes UMI, if not provided, `NA` is output in the `umi` column barcode_tag: BAM tag that encodes cell barcode, if not provided, `NA` is output in the `barcode` column gene_tag: BAM tag that encodes gene assignment barcodes: List of barcodes to be considered. All barcodes are considered if not provided temp_dir: Path to temporary directory velocity: Whether or not velocities were assigned Returns: Path to coverage CSV """ logger.debug(f'Extracting contigs from BAM {bam_path}') contigs = [] n_reads = 0 with pysam.AlignmentFile(bam_path, 'rb') as bam: for index in bam.get_index_statistics(): n_reads += index.total if index.total > 0: contigs.append(index.contig) # Turns out coverage calculation per read is fast enough that we end # up being IO-bound if we use multiprocessing. So, we just process # sequentially (per contig) here instead. # We create a new pool because we still want to be able to keep track of # the current progress. pool, counter, lock = utils.make_pool_with_counter(1) async_result = pool.starmap_async( partial( calculate_coverage_contig, counter, lock, bam_path, alignments=alignments, umi_tag=umi_tag, barcode_tag=barcode_tag, gene_tag=gene_tag, barcodes=barcodes, velocity=velocity ), [(contig, conversions.get(contig, set())) for contig in contigs] ) pool.close() # Display progres bar utils.display_progress_with_counter(counter, n_reads, async_result) pool.join() logger.debug('Combining results') coverage = {} for coverage_part_path in async_result.get(): cov = read_coverage(coverage_part_path) coverage = utils.merge_dictionaries(coverage, cov) logger.debug(f'Writing coverage to {coverage_path}') with open(coverage_path, 'w') as coverage_out: coverage_out.write('contig,genome_i,coverage\n') for (contig, genome_i), cover in utils.flatten_dictionary(coverage): coverage_out.write(f'{contig},{genome_i},{cover}\n') return coverage_path