Source code for dynast.consensus

import datetime as dt
import os
from typing import List, Optional

import ngs_tools as ngs
from typing_extensions import Literal

from . import config, constants, preprocessing
from .logging import logger
from .stats import Stats


@logger.namespaced('consensus')
[docs]def consensus( bam_path: str, gtf_path: str, out_dir: str, strand: Literal['forward', 'reverse', 'unstranded'] = 'forward', umi_tag: Optional[str] = None, barcode_tag: Optional[str] = None, gene_tag: str = 'GX', barcodes: Optional[List[str]] = None, quality: int = 27, add_RS_RI: bool = False, n_threads: int = 8, temp_dir: Optional[str] = None, ): """Main interface for the `consensus` command. Args: bam_path: Path to BAM to call consensus from gtf_path: Path to GTF used to build STAR index out_dir: Path to output directory strand: Strandedness of the protocol umi_tag: BAM tag to use as UMIs barcode_tag: BAM tag to use as cell barcodes gene_tag: BAM tag to use as gene assignments barcodes: Only consider these barcodes quality: Quality threshold add_RS_RI: Add RS and RI tags to BAM. Mostly useful for debugging. n_threads: Number of threads to use temp_dir: Temporary directory """ stats = Stats() stats.start() stats_path = os.path.join( out_dir, f'{constants.STATS_PREFIX}_{dt.datetime.strftime(stats.start_time, "%Y%m%d_%H%M%S_%f")}.json' ) os.makedirs(out_dir, exist_ok=True) # Sort and index bam. bam_path = preprocessing.sort_and_index_bam( bam_path, '{}.sortedByCoord{}'.format(*os.path.splitext(bam_path)), n_threads=n_threads ) # Check BAM tags tags = preprocessing.get_tags_from_bam(bam_path, config.BAM_PEEK_READS, n_threads=n_threads) required_tags = config.BAM_REQUIRED_TAGS.copy() if barcode_tag: required_tags.append(barcode_tag) elif config.BAM_BARCODE_TAG in tags: logger.warning( f'BAM contains reads with {config.BAM_BARCODE_TAG} tag. Are you sure ' f'you didn\'t mean to provide `--barcode-tag {config.BAM_BARCODE_TAG}`?' ) elif config.BAM_READGROUP_TAG in tags: logger.warning( f'BAM contains reads with {config.BAM_READGROUP_TAG} tag. Are you sure ' f'you didn\'t mean to provide `--barcode-tag {config.BAM_READGROUP_TAG}`?' ) if umi_tag: required_tags.append(umi_tag) elif config.BAM_UMI_TAG in tags: logger.warning( f'BAM contains reads with {config.BAM_UMI_TAG} tag. Are you sure ' f'you didn\'t mean to provide `--umi-tag {config.BAM_UMI_TAG}`?' ) if gene_tag: required_tags.append(gene_tag) elif config.BAM_GENE_TAG in tags: logger.warning( f'BAM contains reads with {config.BAM_GENE_TAG} tag. Are you sure ' f'you didn\'t mean to provide `--gene-tag {config.BAM_GENE_TAG}`?' ) missing_tags = set(required_tags) - tags if missing_tags: raise Exception( f'First {config.BAM_PEEK_READS} reads in the BAM do not contain the following required tags: ' f'{", ".join(missing_tags)}. ' ) # Check BAM alignments if preprocessing.check_bam_contains_secondary(bam_path, config.BAM_PEEK_READS, n_threads=n_threads): logger.warning( 'BAM contains secondary alignments, which will be ignored. Only primary ' 'alignments are considered.' ) if preprocessing.check_bam_contains_unmapped(bam_path): logger.warning('BAM contains unmapped reads, which will be ignored.') if preprocessing.check_bam_contains_duplicate(bam_path, config.BAM_PEEK_READS, n_threads=n_threads): logger.warning('BAM contains duplicate reads, which will be ignored.') logger.info('Parsing gene and transcript information from GTF') gene_infos, transcript_infos = ngs.gtf.genes_and_transcripts_from_gtf(gtf_path, use_version=False) consensus_path = os.path.join(out_dir, constants.CONSENSUS_BAM_FILENAME) logger.info(f'Calling consensus sequences from BAM to {consensus_path}') preprocessing.call_consensus( bam_path, consensus_path, gene_infos, strand=strand, umi_tag=umi_tag, barcode_tag=barcode_tag, gene_tag=gene_tag, barcodes=barcodes, quality=quality, add_RS_RI=add_RS_RI, temp_dir=temp_dir, n_threads=n_threads ) stats.end() stats.save(stats_path)